summaryrefslogtreecommitdiff
path: root/groot-common
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-07-09 05:31:11 +0000
committer王宽 <[email protected]>2024-07-09 05:31:11 +0000
commit36cb5140d28ab103b4b9e035afbfe4dc9adf7880 (patch)
treea9a4fea1a06d95a068d7eb3cc37c19f7af8db28c /groot-common
parent7cfc17b56ff27da9c0e488883447a11a465fc766 (diff)
Feature/hdfs
Diffstat (limited to 'groot-common')
-rw-r--r--groot-common/pom.xml16
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/Constants.java3
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java107
3 files changed, 125 insertions, 1 deletions
diff --git a/groot-common/pom.xml b/groot-common/pom.xml
index d6463cb..f00cfc9 100644
--- a/groot-common/pom.xml
+++ b/groot-common/pom.xml
@@ -86,7 +86,21 @@
<scope>provided</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
</dependencies>
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
index 46955cb..3c7d095 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/Constants.java
@@ -26,6 +26,9 @@ public final class Constants {
public static final String SYSPROP_GROOTSTREAM_CONFIG = "grootstream.config";
public static final String SYSPROP_GROOTSTREAM_PREFIX = "props.";
+ public static final String SYSPROP_GROOTSTREAM_HADOOP_PREFIX = "hadoop.";
+ public static final String SYSPROP_GROOTSTREAM_KB_SCHEDULER_INTERVAL_NAME = "scheduler.knowledge_base.update.interval.minutes";
+
public static final String HAZELCAST_GROOTSTREAM_CONFIG_FILE_PREFIX = "grootstream";
public static final String HAZELCAST_GROOTSTREAM_CONFIG_DEFAULT = "grootstream.yaml";
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
new file mode 100644
index 0000000..ccec51b
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
@@ -0,0 +1,107 @@
+package com.geedgenetworks.common.utils;
+
+import com.alibaba.fastjson2.JSONArray;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.shaded.com.google.common.base.Splitter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.*;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+@Slf4j
+public class HdfsUtils {
+
+ private static FileSystem fs;
+ private static HdfsUtils hdfsInstance;
+
+ private HdfsUtils(Map<String, String> hdfsProp) {
+ Configuration config = new Configuration();
+ String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12");
+ String port = hdfsProp.getOrDefault("dfs.port","9000");
+ String name = hdfsProp.getOrDefault("dfs.user","root");
+ List<String> hadoopNameNodes = new ArrayList<>();
+ String defaultFS = "";
+ if (!paths.isEmpty()) {
+ try {
+ hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths);
+ } catch (RuntimeException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of dfs.namenodes is illegal");
+ }
+ }
+ if (hadoopNameNodes.size() == 1) {
+ String replication = hdfsProp.getOrDefault("dfs.replication","1");
+ defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port;
+ config.set("fs.defaultFS", defaultFS);
+ config.set("dfs.replication", replication);
+ } else if (hadoopNameNodes.size() > 1) {
+ String replication = hdfsProp.getOrDefault("dfs.replication","2");
+ String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1");
+ String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2");
+ defaultFS = "hdfs://" + nameservices;
+ config.set("fs.defaultFS", defaultFS);
+ config.set("dfs.nameservices", nameservices);
+ config.set("dfs.ha.namenodes.ns1", ns1);
+ config.set("dfs.namenode.rpc-address.ns1.nn1", hadoopNameNodes.get(0) + ":" + port);
+ config.set("dfs.namenode.rpc-address.ns1.nn2", hadoopNameNodes.get(1) + ":" + port);
+ config.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+ config.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs");
+ config.set("dfs.replication", replication);
+ }
+ try {
+ fs = FileSystem.get(new URI(defaultFS), config, name);
+ } catch (IOException | URISyntaxException | InterruptedException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error",e);
+ }
+ }
+
+ public static HdfsUtils initHdfsUtilInstance(Map<String, String> hdfsProp) {
+ if (hdfsInstance == null) {
+ hdfsInstance = new HdfsUtils(hdfsProp);
+ }
+ return hdfsInstance;
+ }
+ public static HdfsUtils getHdfsUtilInstance() {
+ if (hdfsInstance == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error");
+ }
+ return hdfsInstance;
+ }
+ public byte[] readBytes(Path hdfsPath) {
+
+ try {
+ if (fs.exists(hdfsPath)) {
+ try (FSDataInputStream inputStream = fs.open(hdfsPath);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ byte[] buffer = new byte[1024];
+ int bytesRead;
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+ return outputStream.toByteArray();
+ } catch (RuntimeException e) {
+ String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath);
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e);
+ }
+ }
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+} \ No newline at end of file