summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author王宽 <[email protected]>2024-07-11 10:09:44 +0000
committer王宽 <[email protected]>2024-07-11 10:09:44 +0000
commit70d53ccc1b474af41abefda6809bd337c5995e1e (patch)
tree018b33af1fe592a3655d5b4edc9747cdd5396870
parent2fabfdb38c8850ec76ef457a252a7609f6bac454 (diff)
parent76548454d29184d9a432bc2621b38a11f1b93999 (diff)
Merge branch 'develop' into 'feature/hdfs'
# Conflicts: # groot-common/pom.xml # groot-common/src/main/java/com/geedgenetworks/common/Constants.java # groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java # groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
-rw-r--r--.gitlab-ci.yml2
-rw-r--r--config/grootstream.yaml6
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java107
-rw-r--r--pom.xml10
4 files changed, 119 insertions, 6 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 12dfad6..abccdd8 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -37,7 +37,7 @@ deploy:
stage: deploy
script:
- echo "$MAVEN_SETTINGS_XML" > /usr/share/maven/conf/settings.xml
- - mvn $MAVEN_CLI_OPTS enforcer:enforce@release-version-check
+ # - mvn $MAVEN_CLI_OPTS enforcer:enforce@release-version-check
- |-
mvn $MAVEN_CLI_OPTS clean verify sonar:sonar -Dsonar.projectKey=$SONAR_PROJECT_KEY \
-Dsonar.host.url=$SONAR_HOST_URL -Dsonar.login=$SONAR_LOGIN_TOKEN \
diff --git a/config/grootstream.yaml b/config/grootstream.yaml
index e01fda3..b48197a 100644
--- a/config/grootstream.yaml
+++ b/config/grootstream.yaml
@@ -16,3 +16,9 @@ grootstream:
hos.bucket.name.traffic_file: traffic_file_bucket
hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket
scheduler.knowledge_base.update.interval.minutes: 5
+ hadoop.dfs.namenodes: 192.168.44.12
+ hadoop.dfs.replication: 1
+ #hadoop.dfs.port: 9000
+ #hadoop.dfs.user: root
+ #hadoop.dfs.nameservices: ns1
+ #hadoop.dfs.ha.namenodes.ns1: nn1,nn2 \ No newline at end of file
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
new file mode 100644
index 0000000..ccec51b
--- /dev/null
+++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java
@@ -0,0 +1,107 @@
+package com.geedgenetworks.common.utils;
+
+import com.alibaba.fastjson2.JSONArray;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.shaded.com.google.common.base.Splitter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.*;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+@Slf4j
+public class HdfsUtils {
+
+ private static FileSystem fs;
+ private static HdfsUtils hdfsInstance;
+
+ private HdfsUtils(Map<String, String> hdfsProp) {
+ Configuration config = new Configuration();
+ String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12");
+ String port = hdfsProp.getOrDefault("dfs.port","9000");
+ String name = hdfsProp.getOrDefault("dfs.user","root");
+ List<String> hadoopNameNodes = new ArrayList<>();
+ String defaultFS = "";
+ if (!paths.isEmpty()) {
+ try {
+ hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths);
+ } catch (RuntimeException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of dfs.namenodes is illegal");
+ }
+ }
+ if (hadoopNameNodes.size() == 1) {
+ String replication = hdfsProp.getOrDefault("dfs.replication","1");
+ defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port;
+ config.set("fs.defaultFS", defaultFS);
+ config.set("dfs.replication", replication);
+ } else if (hadoopNameNodes.size() > 1) {
+ String replication = hdfsProp.getOrDefault("dfs.replication","2");
+ String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1");
+ String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2");
+ defaultFS = "hdfs://" + nameservices;
+ config.set("fs.defaultFS", defaultFS);
+ config.set("dfs.nameservices", nameservices);
+ config.set("dfs.ha.namenodes.ns1", ns1);
+ config.set("dfs.namenode.rpc-address.ns1.nn1", hadoopNameNodes.get(0) + ":" + port);
+ config.set("dfs.namenode.rpc-address.ns1.nn2", hadoopNameNodes.get(1) + ":" + port);
+ config.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+ config.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs");
+ config.set("dfs.replication", replication);
+ }
+ try {
+ fs = FileSystem.get(new URI(defaultFS), config, name);
+ } catch (IOException | URISyntaxException | InterruptedException e) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error",e);
+ }
+ }
+
+ public static HdfsUtils initHdfsUtilInstance(Map<String, String> hdfsProp) {
+ if (hdfsInstance == null) {
+ hdfsInstance = new HdfsUtils(hdfsProp);
+ }
+ return hdfsInstance;
+ }
+ public static HdfsUtils getHdfsUtilInstance() {
+ if (hdfsInstance == null) {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error");
+ }
+ return hdfsInstance;
+ }
+ public byte[] readBytes(Path hdfsPath) {
+
+ try {
+ if (fs.exists(hdfsPath)) {
+ try (FSDataInputStream inputStream = fs.open(hdfsPath);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ byte[] buffer = new byte[1024];
+ int bytesRead;
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+ return outputStream.toByteArray();
+ } catch (RuntimeException e) {
+ String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath);
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e);
+ }
+ }
+ throw new GrootStreamRuntimeException(
+ CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+} \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 88bf30b..77a6f3a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
</modules>
<properties>
- <revision>1.4.0</revision>
+ <revision>1.5.0-SNAPSHOT</revision>
<java.version>11</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>${java.version}</maven.compiler.source>
@@ -880,9 +880,9 @@
<repositories>
<repository>
- <id>nexus</id>
+ <id>nexus3</id>
<name>Team Nexus Repository</name>
- <url>http://192.168.40.153:8099/content/groups/public</url>
+ <url>http://192.168.40.153:8081/repository/public/</url>
</repository>
<repository>
<id>cloudera</id>
@@ -893,12 +893,12 @@
<distributionManagement>
<repository>
<id>platform-releases</id>
- <url>http://192.168.40.153:8099/content/repositories/platform-release</url>
+ <url>http://192.168.40.153:8081/repository/platform-release/</url>
<uniqueVersion>true</uniqueVersion>
</repository>
<snapshotRepository>
<id>platform-snapshots</id>
- <url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url>
+ <url>http://192.168.40.153:8081/repository/platform-snapshot/</url>
</snapshotRepository>
</distributionManagement>