diff options
| author | wangkuan <[email protected]> | 2024-07-05 17:55:09 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-07-05 17:55:09 +0800 |
| commit | 05f2efaea8e3ce6c5798cf38ebb4caef9291142c (patch) | |
| tree | a6a8c79cb9ebaa3d5583ff47f1d448d5d58fb0fe | |
| parent | bbb50013547e0bf83bb64a7611b911a5a92a41c0 (diff) | |
[improve][core]支持通过hdfs读取知识库
10 files changed, 213 insertions, 25 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml index acb19b1..df9c82b 100644 --- a/config/grootstream.yaml +++ b/config/grootstream.yaml @@ -11,6 +11,33 @@ grootstream: files: - 64af7077-eb9b-4b8f-80cf-2ceebc89bea9 - 004390bc-3135-4a6f-a492-3662ecb9e289 + + # - name: tsg_ip_asn + # fs_type: hdfs + # fs_path: /kb + # files: + # - asn_builtin.mmdb + # properties: + # dfs.namenodes: 192.168.44.12 + # dfs.replication: 1 + # #dfs.port: 9000 + # #dfs.user: root + # #dfs.nameservices: ns1 + # #dfs.ha.namenodes.ns1: nn1,nn2 + + # - name: tsg_ip_location + # fs_type: hdfs + # fs_path: /kb + # files: + # - ip_user_defined.mmdb + # - ip_builtin.mmdb + # properties: + # dfs.namenodes: 192.168.44.12 + # dfs.replication: 1 + # #dfs.port: 9000 + # #dfs.user: root + # #dfs.nameservices: ns1 + # #dfs.ha.namenodes.ns1: nn1,nn2 properties: hos.path: http://192.168.44.12:9098/hos hos.bucket.name.traffic_file: traffic_file_bucket diff --git a/groot-common/pom.xml b/groot-common/pom.xml index d6463cb..f00cfc9 100644 --- a/groot-common/pom.xml +++ b/groot-common/pom.xml @@ -86,7 +86,21 @@ <scope>provided</scope> </dependency> - + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> </dependencies> diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java new file mode 100644 index 0000000..def3866 --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java @@ -0,0 +1,101 @@ +package com.geedgenetworks.common.utils; + +import com.alibaba.fastjson2.JSONArray; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.shaded.com.google.common.base.Splitter; +import lombok.extern.slf4j.Slf4j; + +import java.io.*; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +@Slf4j +public class HdfsUtils { + + private static FileSystem fs; + private static HdfsUtils hdfsInstance; + + private HdfsUtils(Map<String, String> hdfsProp) { + Configuration config = new Configuration(); + String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12"); + String replication = hdfsProp.getOrDefault("dfs.replication","1"); + String port = hdfsProp.getOrDefault("dfs.port","9000"); + String name = hdfsProp.getOrDefault("dfs.user","root"); + List<String> hadoopNameNodes = new ArrayList<>(); + String defaultFS = ""; + if (!paths.isEmpty()) { + try { + hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths); + } catch (RuntimeException e) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of hadoop.namenodes is illegal"); + } + } + if (hadoopNameNodes.size() == 1) { + defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port; + config.set("fs.defaultFS", defaultFS); + config.set("dfs.replication", replication); + } else if (hadoopNameNodes.size() > 1) { + String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1"); + String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2"); + defaultFS = "hdfs://" + nameservices; + config.set("fs.defaultFS", defaultFS); + config.set("dfs.nameservices", nameservices); + config.set("dfs.ha.namenodes.ns1", ns1); + config.set("dfs.namenode.rpc-address.ns1.nn1", hadoopNameNodes.get(0) + ":" + port); + config.set("dfs.namenode.rpc-address.ns1.nn2", hadoopNameNodes.get(1) + ":" + port); + config.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + config.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs"); + config.set("dfs.replication", replication); + } + try { + fs = FileSystem.get(new URI(defaultFS), config, name); + } catch (IOException | URISyntaxException | InterruptedException e) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error",e); + } + } + + public static HdfsUtils getHdfsUtilInstance(Map<String, String> hdfsProp) { + if (hdfsInstance == null) { + hdfsInstance = new HdfsUtils(hdfsProp); + } + return hdfsInstance; + } + + public byte[] readBytes(Path hdfsPath) { + + try { + if (fs.exists(hdfsPath)) { + try (FSDataInputStream inputStream = fs.open(hdfsPath); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + return outputStream.toByteArray(); + } catch (RuntimeException e) { + String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath); + throw new GrootStreamRuntimeException( + CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e); + } + } + throw new GrootStreamRuntimeException( + CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +}
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java index 6ab8cc3..4d2119a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java @@ -4,13 +4,16 @@ import cn.hutool.core.io.file.PathUtil; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.geedgenetworks.common.config.KnowledgeBaseConfig; -import com.geedgenetworks.common.utils.FileUtils; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.common.utils.HdfsUtils; import com.geedgenetworks.core.pojo.KnowLedgeBaseFileMeta; import com.geedgenetworks.core.utils.HttpClientPoolUtil; import lombok.extern.slf4j.Slf4j; import java.net.URI; import java.nio.file.Path; +import java.nio.file.Paths; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -65,7 +68,17 @@ public abstract class AbstractKnowledgeBaseHandler { public static byte[] readFileFromLocalPath(Path filePath) { return PathUtil.readBytes(filePath); } - + public static byte[] readFileFromFileSystem(Map<String, String> knowledgeBaseProp, String filePath,String fileName,String fileSystemType) { + + switch (fileSystemType) { + case "hdfs": + return HdfsUtils.getHdfsUtilInstance(knowledgeBaseProp).readBytes(new org.apache.hadoop.fs.Path(filePath, fileName)); + case "local": + return PathUtil.readBytes(Paths.get(filePath, fileName)); + default: + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT,fileSystemType + " is illegal"); + } + } public static List<KnowLedgeBaseFileMeta> getMetadata(String type, String path) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java index 95f9291..be64fd9 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractMultipleKnowledgeBaseHandler.java @@ -64,10 +64,9 @@ public abstract class AbstractMultipleKnowledgeBaseHandler extends AbstractKnowl if ("http".equals(knowledgeBaseConfig.getFsType())) { return readFileFromKnowledgeBase(knowledgeMetedataCacheMap.get(encodeId(id)).getPath(), knowledgeMetedataCacheMap.get(encodeId(id)).getIsValid()); } - if ("local".equals(knowledgeBaseConfig.getFsType())) { - return readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), id.toString())); + else { + return readFileFromFileSystem(knowledgeBaseConfig.getProperties(),knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(0),knowledgeBaseConfig.getFsType()); } - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal"); } protected List<Long> getAllFileIds() { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java index 99a631d..d38097a 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractSingleKnowledgeBaseHandler.java @@ -60,15 +60,10 @@ public abstract class AbstractSingleKnowledgeBaseHandler extends AbstractKnowled */ public byte[] downloadFile() { byte[] bytes; - switch (knowledgeBaseConfig.getFsType()) { - case "http": - bytes = readFileFromKnowledgeBase(knowledgeMetedataCache.getPath(), knowledgeMetedataCache.getIsValid()); - break; - case "local": - bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(0))); - break; - default: - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal"); + if (knowledgeBaseConfig.getFsType().equals("http")) { + bytes = readFileFromKnowledgeBase(knowledgeMetedataCache.getPath(), knowledgeMetedataCache.getIsValid()); + } else { + bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(), knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(0), knowledgeBaseConfig.getFsType()); } return decrypt(bytes); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java index c367d2d..a1927db 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AsnKnowledgeBaseHandler.java @@ -9,6 +9,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.io.ByteArrayInputStream; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -105,8 +106,7 @@ public class AsnKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler { IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false); for (int i = 0; i < knowledgeBaseConfig.getFiles().size(); i++) { try { - byte[] bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i))); - + byte[] bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(),knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i),knowledgeBaseConfig.getFsType()); switch (i) { case 0: asnLookupBuilder.loadAsnDataFile(new ByteArrayInputStream(bytes)); @@ -130,6 +130,8 @@ public class AsnKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler { return true; } + + private boolean buildKnowledgeBaseWithMeta(KnowledgeBaseConfig knowledgeBaseConfig) { IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java index 99b64a8..13f9a7e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/GeoIpKnowledgeBaseHandler.java @@ -91,8 +91,7 @@ public class GeoIpKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler { IpLookupV2.Builder ipLookupBuilder = new IpLookupV2.Builder(false); for (int i = 0; i < knowledgeBaseConfig.getFiles().size(); i++) { try { - byte[] bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i))); - + byte[] bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(),knowledgeBaseConfig.getFsPath(), knowledgeBaseConfig.getFiles().get(i),knowledgeBaseConfig.getFsType()); switch (i) { case 0: ipLookupBuilder.loadDataFile(new ByteArrayInputStream(bytes)); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java index 2e187b3..20defe3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/RuleKnowledgeBaseHandler.java @@ -69,15 +69,12 @@ public class RuleKnowledgeBaseHandler extends AbstractKnowledgeBaseHandler { public void updateCache() { if ("http".equals(knowledgeBaseConfig.getFsType())) { requestApi(); - return; } - if ("local".equals(knowledgeBaseConfig.getFsType())) { - byte[] bytes = readFileFromLocalPath(Paths.get(knowledgeBaseConfig.getFsPath())); + else { + byte[] bytes = readFileFromFileSystem(knowledgeBaseConfig.getProperties(), knowledgeBaseConfig.getFsPath(), "", knowledgeBaseConfig.getFsType()); RuleResponse ruleResponse = JSON.parseObject(new String(bytes), RuleResponse.class); processResponse(ruleResponse); - return; } - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, knowledgeBaseConfig.getFsType() + " is illegal"); } private void requestApi() { @@ -65,6 +65,7 @@ <config.version>1.3.3</config.version> <hazelcast.version>5.1</hazelcast.version> <quartz.version>2.3.2</quartz.version> + <hadoop.version>2.7.1</hadoop.version> <!--Option config--> <test.dependency.skip>true</test.dependency.skip> <skip.spotless>true</skip.spotless> @@ -76,7 +77,6 @@ <dependencyManagement> <dependencies> - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> @@ -434,6 +434,47 @@ <scope>provided</scope> </dependency> + <!-- <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>2.7.1</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>2.7.1</version> + <exclusions> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>2.7.1</version> + <exclusions> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + </exclusions> + </dependency>--> </dependencies> </dependencyManagement> |
