diff options
| author | wangkuan <[email protected]> | 2024-07-11 18:13:27 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-07-11 18:13:27 +0800 |
| commit | 165e276cee3d34dda65ccd83c6fcbc59b092c3f8 (patch) | |
| tree | 018b33af1fe592a3655d5b4edc9747cdd5396870 /groot-common/src/main | |
| parent | 2fabfdb38c8850ec76ef457a252a7609f6bac454 (diff) | |
| parent | 76548454d29184d9a432bc2621b38a11f1b93999 (diff) | |
Merge branch 'develop' of https://git.mesalab.cn/galaxy/platform/groot-stream into feature/hdfs
# Conflicts:
# groot-common/pom.xml
# groot-common/src/main/java/com/geedgenetworks/common/Constants.java
# groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
# groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
Diffstat (limited to 'groot-common/src/main')
| -rw-r--r-- | groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java new file mode 100644 index 0000000..ccec51b --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java @@ -0,0 +1,107 @@ +package com.geedgenetworks.common.utils; + +import com.alibaba.fastjson2.JSONArray; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.shaded.com.google.common.base.Splitter; +import lombok.extern.slf4j.Slf4j; + +import java.io.*; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +@Slf4j +public class HdfsUtils { + + private static FileSystem fs; + private static HdfsUtils hdfsInstance; + + private HdfsUtils(Map<String, String> hdfsProp) { + Configuration config = new Configuration(); + String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12"); + String port = hdfsProp.getOrDefault("dfs.port","9000"); + String name = hdfsProp.getOrDefault("dfs.user","root"); + List<String> hadoopNameNodes = new ArrayList<>(); + String defaultFS = ""; + if (!paths.isEmpty()) { + try { + hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths); + } catch (RuntimeException e) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of dfs.namenodes is illegal"); + } + } + if (hadoopNameNodes.size() == 1) { + String replication = hdfsProp.getOrDefault("dfs.replication","1"); + defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port; + config.set("fs.defaultFS", defaultFS); + config.set("dfs.replication", replication); + } else if (hadoopNameNodes.size() > 1) { + String replication = hdfsProp.getOrDefault("dfs.replication","2"); + String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1"); + String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2"); + defaultFS = "hdfs://" + nameservices; + config.set("fs.defaultFS", defaultFS); + config.set("dfs.nameservices", nameservices); + config.set("dfs.ha.namenodes.ns1", ns1); + config.set("dfs.namenode.rpc-address.ns1.nn1", hadoopNameNodes.get(0) + ":" + port); + config.set("dfs.namenode.rpc-address.ns1.nn2", hadoopNameNodes.get(1) + ":" + port); + config.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + config.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs"); + config.set("dfs.replication", replication); + } + try { + fs = FileSystem.get(new URI(defaultFS), config, name); + } catch (IOException | URISyntaxException | InterruptedException e) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error",e); + } + } + + public static HdfsUtils initHdfsUtilInstance(Map<String, String> hdfsProp) { + if (hdfsInstance == null) { + hdfsInstance = new HdfsUtils(hdfsProp); + } + return hdfsInstance; + } + public static HdfsUtils getHdfsUtilInstance() { + if (hdfsInstance == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error"); + } + return hdfsInstance; + } + public byte[] readBytes(Path hdfsPath) { + + try { + if (fs.exists(hdfsPath)) { + try (FSDataInputStream inputStream = fs.open(hdfsPath); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + return outputStream.toByteArray(); + } catch (RuntimeException e) { + String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath); + throw new GrootStreamRuntimeException( + CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e); + } + } + throw new GrootStreamRuntimeException( + CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +}
\ No newline at end of file |
