diff options
| author | wangkuan <[email protected]> | 2024-07-11 18:13:27 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-07-11 18:13:27 +0800 |
| commit | 165e276cee3d34dda65ccd83c6fcbc59b092c3f8 (patch) | |
| tree | 018b33af1fe592a3655d5b4edc9747cdd5396870 | |
| parent | 2fabfdb38c8850ec76ef457a252a7609f6bac454 (diff) | |
| parent | 76548454d29184d9a432bc2621b38a11f1b93999 (diff) | |
Merge branch 'develop' of https://git.mesalab.cn/galaxy/platform/groot-stream into feature/hdfs
# Conflicts:
# groot-common/pom.xml
# groot-common/src/main/java/com/geedgenetworks/common/Constants.java
# groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
# groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/AbstractKnowledgeBaseHandler.java
| -rw-r--r-- | .gitlab-ci.yml | 2 | ||||
| -rw-r--r-- | config/grootstream.yaml | 6 | ||||
| -rw-r--r-- | groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java | 107 | ||||
| -rw-r--r-- | pom.xml | 10 |
4 files changed, 119 insertions, 6 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 12dfad6..abccdd8 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -37,7 +37,7 @@ deploy: stage: deploy script: - echo "$MAVEN_SETTINGS_XML" > /usr/share/maven/conf/settings.xml - - mvn $MAVEN_CLI_OPTS enforcer:enforce@release-version-check + # - mvn $MAVEN_CLI_OPTS enforcer:enforce@release-version-check - |- mvn $MAVEN_CLI_OPTS clean verify sonar:sonar -Dsonar.projectKey=$SONAR_PROJECT_KEY \ -Dsonar.host.url=$SONAR_HOST_URL -Dsonar.login=$SONAR_LOGIN_TOKEN \ diff --git a/config/grootstream.yaml b/config/grootstream.yaml index e01fda3..b48197a 100644 --- a/config/grootstream.yaml +++ b/config/grootstream.yaml @@ -16,3 +16,9 @@ grootstream: hos.bucket.name.traffic_file: traffic_file_bucket hos.bucket.name.troubleshooting_file: troubleshooting_file_bucket scheduler.knowledge_base.update.interval.minutes: 5 + hadoop.dfs.namenodes: 192.168.44.12 + hadoop.dfs.replication: 1 + #hadoop.dfs.port: 9000 + #hadoop.dfs.user: root + #hadoop.dfs.nameservices: ns1 + #hadoop.dfs.ha.namenodes.ns1: nn1,nn2
\ No newline at end of file diff --git a/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java new file mode 100644 index 0000000..ccec51b --- /dev/null +++ b/groot-common/src/main/java/com/geedgenetworks/common/utils/HdfsUtils.java @@ -0,0 +1,107 @@ +package com.geedgenetworks.common.utils; + +import com.alibaba.fastjson2.JSONArray; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.shaded.com.google.common.base.Splitter; +import lombok.extern.slf4j.Slf4j; + +import java.io.*; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +@Slf4j +public class HdfsUtils { + + private static FileSystem fs; + private static HdfsUtils hdfsInstance; + + private HdfsUtils(Map<String, String> hdfsProp) { + Configuration config = new Configuration(); + String paths = hdfsProp.getOrDefault("dfs.namenodes","192.168.44.12"); + String port = hdfsProp.getOrDefault("dfs.port","9000"); + String name = hdfsProp.getOrDefault("dfs.user","root"); + List<String> hadoopNameNodes = new ArrayList<>(); + String defaultFS = ""; + if (!paths.isEmpty()) { + try { + hadoopNameNodes= Splitter.on(",").trimResults().splitToList(paths); + } catch (RuntimeException e) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param of dfs.namenodes is illegal"); + } + } + if (hadoopNameNodes.size() == 1) { + String replication = hdfsProp.getOrDefault("dfs.replication","1"); + defaultFS = "hdfs://" + hadoopNameNodes.get(0) + ":" + port; + config.set("fs.defaultFS", defaultFS); + config.set("dfs.replication", replication); + } else if (hadoopNameNodes.size() > 1) { + String replication = hdfsProp.getOrDefault("dfs.replication","2"); + String nameservices = hdfsProp.getOrDefault("dfs.nameservices","ns1"); + String ns1 = hdfsProp.getOrDefault("dfs.ha.namenodes.ns1","nn1,nn2"); + defaultFS = "hdfs://" + nameservices; + config.set("fs.defaultFS", defaultFS); + config.set("dfs.nameservices", nameservices); + config.set("dfs.ha.namenodes.ns1", ns1); + config.set("dfs.namenode.rpc-address.ns1.nn1", hadoopNameNodes.get(0) + ":" + port); + config.set("dfs.namenode.rpc-address.ns1.nn2", hadoopNameNodes.get(1) + ":" + port); + config.set("dfs.client.failover.proxy.provider.ns1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + config.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs"); + config.set("dfs.replication", replication); + } + try { + fs = FileSystem.get(new URI(defaultFS), config, name); + } catch (IOException | URISyntaxException | InterruptedException e) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error",e); + } + } + + public static HdfsUtils initHdfsUtilInstance(Map<String, String> hdfsProp) { + if (hdfsInstance == null) { + hdfsInstance = new HdfsUtils(hdfsProp); + } + return hdfsInstance; + } + public static HdfsUtils getHdfsUtilInstance() { + if (hdfsInstance == null) { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "create hdfs fileSystem error"); + } + return hdfsInstance; + } + public byte[] readBytes(Path hdfsPath) { + + try { + if (fs.exists(hdfsPath)) { + try (FSDataInputStream inputStream = fs.open(hdfsPath); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + return outputStream.toByteArray(); + } catch (RuntimeException e) { + String errorMsg = String.format("get hdfs file [%s] failed", hdfsPath); + throw new GrootStreamRuntimeException( + CommonErrorCode.FILE_OPERATION_ERROR, errorMsg, e); + } + } + throw new GrootStreamRuntimeException( + CommonErrorCode.FILE_OPERATION_ERROR, String.format("hdfs filepath [%s] is not exists", hdfsPath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +}
\ No newline at end of file @@ -23,7 +23,7 @@ </modules> <properties> - <revision>1.4.0</revision> + <revision>1.5.0-SNAPSHOT</revision> <java.version>11</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>${java.version}</maven.compiler.source> @@ -880,9 +880,9 @@ <repositories> <repository> - <id>nexus</id> + <id>nexus3</id> <name>Team Nexus Repository</name> - <url>http://192.168.40.153:8099/content/groups/public</url> + <url>http://192.168.40.153:8081/repository/public/</url> </repository> <repository> <id>cloudera</id> @@ -893,12 +893,12 @@ <distributionManagement> <repository> <id>platform-releases</id> - <url>http://192.168.40.153:8099/content/repositories/platform-release</url> + <url>http://192.168.40.153:8081/repository/platform-release/</url> <uniqueVersion>true</uniqueVersion> </repository> <snapshotRepository> <id>platform-snapshots</id> - <url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url> + <url>http://192.168.40.153:8081/repository/platform-snapshot/</url> </snapshotRepository> </distributionManagement> |
