summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2024-07-11 18:18:48 +0800
committerwangkuan <[email protected]>2024-07-11 18:18:48 +0800
commit6bce97db6ace677e9d9e66bfa65a05604d9b3248 (patch)
tree6825616cb1a9b829ea6d45c35bcc96da7b6805dd
parent165e276cee3d34dda65ccd83c6fcbc59b092c3f8 (diff)
[improve][common]解决冲突
-rw-r--r--config/grootstream.yaml4
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java107
2 files changed, 0 insertions, 111 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml
index b48197a..31c2ae2 100644
--- a/config/grootstream.yaml
+++ b/config/grootstream.yaml
@@ -18,7 +18,3 @@ grootstream:
scheduler.knowledge_base.update.interval.minutes: 5
hadoop.dfs.namenodes: 192.168.44.12
hadoop.dfs.replication: 1
- #hadoop.dfs.port: 9000
- #hadoop.dfs.user: root
- #hadoop.dfs.nameservices: ns1
- #hadoop.dfs.ha.namenodes.ns1: nn1,nn2 \ No newline at end of file
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
deleted file mode 100644
index ccec51b..0000000
--- a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package com.geedgenetworks.common.utils;
-
-import com.alibaba.fastjson2.JSONArray;
-import com.geedgenetworks.common.exception.CommonErrorCode;
-import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
-import com.geedgenetworks.shaded.com.google.common.base.Splitter;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.*;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-@Slf4j
-public class HdfsUtils {
-
- private static FileSystem fs;
- private static HdfsUtils hdfsInstance;
-
- private HdfsUtils(Map<String, String> hdfsProp) {
- Configuration config = new Configuration();
- String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12");
- String port = hdfsProp.getOrDefault("dfs.port","9000");
- String name = hdfsProp.getOrDefault("dfs.user","root");
- List<String> hadoopNameNodes = new ArrayList<>();
- String defaultFS = "";
- if (!paths.isEmpty()) {
- try {
- hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths);
- } catch (RuntimeException e) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of dfs.namenodes is illegal");
- }
- }
- if (hadoopNameNodes.size() == 1) {
- String replication = hdfsProp.getOrDefault("dfs.replication","1");
- defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port;
- config.set("fs.defaultFS", defaultFS);
- config.set("dfs.replication", replication);
- } else if (hadoopNameNodes.size() > 1) {
- String replication = hdfsProp.getOrDefault("dfs.replication","2");
- String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1");
- String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2");
- defaultFS = "hdfs://" + nameservices;
- config.set("fs.defaultFS", defaultFS);
- config.set("dfs.nameservices", nameservices);
- config.set("dfs.ha.namenodes.ns1", ns1);
- config.set("dfs.namenode.rpc-address.ns1.nn1", hadoopNameNodes.get(0) + ":" + port);
- config.set("dfs.namenode.rpc-address.ns1.nn2", hadoopNameNodes.get(1) + ":" + port);
- config.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
- config.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs");
- config.set("dfs.replication", replication);
- }
- try {
- fs = FileSystem.get(new URI(defaultFS), config, name);
- } catch (IOException | URISyntaxException | InterruptedException e) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error",e);
- }
- }
-
- public static HdfsUtils initHdfsUtilInstance(Map<String, String> hdfsProp) {
- if (hdfsInstance == null) {
- hdfsInstance = new HdfsUtils(hdfsProp);
- }
- return hdfsInstance;
- }
- public static HdfsUtils getHdfsUtilInstance() {
- if (hdfsInstance == null) {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error");
- }
- return hdfsInstance;
- }
- public byte[] readBytes(Path hdfsPath) {
-
- try {
- if (fs.exists(hdfsPath)) {
- try (FSDataInputStream inputStream = fs.open(hdfsPath);
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
- byte[] buffer = new byte[1024];
- int bytesRead;
- while ((bytesRead = inputStream.read(buffer)) != -1) {
- outputStream.write(buffer, 0, bytesRead);
- }
- return outputStream.toByteArray();
- } catch (RuntimeException e) {
- String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath);
- throw new GrootStreamRuntimeException(
- CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e);
- }
- }
- throw new GrootStreamRuntimeException(
- CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-} \ No newline at end of file