summaryrefslogtreecommitdiff
path: root/groot-common/src/main
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-07-11 18:13:27 +0800
committerwangkuan <[email protected]>2024-07-11 18:13:27 +0800
commit165e276cee3d34dda65ccd83c6fcbc59b092c3f8 (patch)
tree018b33af1fe592a3655d5b4edc9747cdd5396870 /groot-common/src/main
parent2fabfdb38c8850ec76ef457a252a7609f6bac454 (diff)
parent76548454d29184d9a432bc2621b38a11f1b93999 (diff)
Merge branch 'develop' of https://git.mesalab.cn/galaxy/platform/groot-stream into feature/hdfs
# Conflicts: # groot-common/pom.xml # groot-common/src/main/java/com/geedgenetworks/common/Constants.java # groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java # groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
Diffstat (limited to 'groot-common/src/main')
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java107
1 files changed, 107 insertions, 0 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
new file mode 100644
index 0000000..ccec51b
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
@@ -0,0 +1,107 @@
+package com.geedgenetworks.common.utils;
+
+import com.alibaba.fastjson2.JSONArray;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.shaded.com.google.common.base.Splitter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.*;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+@Slf4j
+public class HdfsUtils {
+
+ private static FileSystem fs;
+ private static HdfsUtils hdfsInstance;
+
+ private HdfsUtils(Map<String, String> hdfsProp) {
+ Configuration config = new Configuration();
+ String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12");
+ String port = hdfsProp.getOrDefault("dfs.port","9000");
+ String name = hdfsProp.getOrDefault("dfs.user","root");
+ List<String> hadoopNameNodes = new ArrayList<>();
+ String defaultFS = "";
+ if (!paths.isEmpty()) {
+ try {
+ hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths);
+ } catch (RuntimeException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of dfs.namenodes is illegal");
+ }
+ }
+ if (hadoopNameNodes.size() == 1) {
+ String replication = hdfsProp.getOrDefault("dfs.replication","1");
+ defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port;
+ config.set("fs.defaultFS", defaultFS);
+ config.set("dfs.replication", replication);
+ } else if (hadoopNameNodes.size() > 1) {
+ String replication = hdfsProp.getOrDefault("dfs.replication","2");
+ String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1");
+ String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2");
+ defaultFS = "hdfs://" + nameservices;
+ config.set("fs.defaultFS", defaultFS);
+ config.set("dfs.nameservices", nameservices);
+ config.set("dfs.ha.namenodes.ns1", ns1);
+ config.set("dfs.namenode.rpc-address.ns1.nn1", hadoopNameNodes.get(0) + ":" + port);
+ config.set("dfs.namenode.rpc-address.ns1.nn2", hadoopNameNodes.get(1) + ":" + port);
+ config.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+ config.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs");
+ config.set("dfs.replication", replication);
+ }
+ try {
+ fs = FileSystem.get(new URI(defaultFS), config, name);
+ } catch (IOException | URISyntaxException | InterruptedException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error",e);
+ }
+ }
+
+ public static HdfsUtils initHdfsUtilInstance(Map<String, String> hdfsProp) {
+ if (hdfsInstance == null) {
+ hdfsInstance = new HdfsUtils(hdfsProp);
+ }
+ return hdfsInstance;
+ }
+ public static HdfsUtils getHdfsUtilInstance() {
+ if (hdfsInstance == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error");
+ }
+ return hdfsInstance;
+ }
+ public byte[] readBytes(Path hdfsPath) {
+
+ try {
+ if (fs.exists(hdfsPath)) {
+ try (FSDataInputStream inputStream = fs.open(hdfsPath);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ byte[] buffer = new byte[1024];
+ int bytesRead;
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+ return outputStream.toByteArray();
+ } catch (RuntimeException e) {
+ String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath);
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e);
+ }
+ }
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+} \ No newline at end of file