summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <[email protected]>2022-11-28 16:42:56 +0800
committerunknown <[email protected]>2022-11-28 16:42:56 +0800
commit0662d265dd05e94c021dbd03d75ee67ffcb61ae0 (patch)
treec9e09daebda11329d702a1cc29800c079a4e9650
parent87fe11dc93f55ae273062f29823f04685675218d (diff)
GAL-224 取消SSL检测,新增HDFS高可用设置knowledge
-rw-r--r--src/main/java/com/zdjizhi/utils/HdfsUtils.java27
-rw-r--r--src/main/java/com/zdjizhi/utils/HttpClientUtils2.java192
2 files changed, 62 insertions, 157 deletions
diff --git a/src/main/java/com/zdjizhi/utils/HdfsUtils.java b/src/main/java/com/zdjizhi/utils/HdfsUtils.java
index 25f90bf..c1e4021 100644
--- a/src/main/java/com/zdjizhi/utils/HdfsUtils.java
+++ b/src/main/java/com/zdjizhi/utils/HdfsUtils.java
@@ -22,28 +22,23 @@ public class HdfsUtils {
static {
Configuration configuration = new Configuration();
try {
- //指定用户
//配置hdfs相关信息
-// configuration.set("fs.defaultFS","hdfs://ns1");
-// configuration.set("hadoop.proxyuser.root.hosts","*");
-// configuration.set("hadoop.proxyuser.root.groups","*");
-// configuration.set("ha.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
-// configuration.set("dfs.nameservices","ns1");
-// configuration.set("dfs.ha.namenodes.ns1","nn1,nn2");
-// configuration.set("dfs.namenode.rpc-address.ns1.nn1",CommonConfig.HDFS_URI_NS1);
-// configuration.set("dfs.namenode.rpc-address.ns1.nn2",CommonConfig.HDFS_URI_NS2);
-// configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
-// //创建fileSystem,用于连接hdfs
-//// fileSystem = FileSystem.get(configuration);
+ configuration.set("fs.defaultFS","hdfs://ns1");
+ configuration.set("hadoop.proxyuser.root.hosts","*");
+ configuration.set("hadoop.proxyuser.root.groups","*");
+ configuration.set("ha.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM);
+ configuration.set("dfs.nameservices","ns1");
+ configuration.set("dfs.ha.namenodes.ns1","nn1,nn2");
+ configuration.set("dfs.namenode.rpc-address.ns1.nn1",CommonConfig.HDFS_URI_NS1);
+ configuration.set("dfs.namenode.rpc-address.ns1.nn2",CommonConfig.HDFS_URI_NS2);
+ configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+ //指定用户
System.setProperty("HADOOP_USER_NAME", CommonConfig.HDFS_USER);
//创建fileSystem,用于连接hdfs
- fileSystem = FileSystem.get(new URI(CommonConfig.HDFS_URI_NS1),configuration);
+ fileSystem = FileSystem.get(configuration);
} catch (IOException e) {
throw new RuntimeException(e);
}
- catch (URISyntaxException e) {
- e.printStackTrace();
- }
}
public static boolean isExists(String filePath) throws IOException {
diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java b/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java
index ddfd210..1136e6d 100644
--- a/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java
+++ b/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java
@@ -11,9 +11,15 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.HttpHostConnectException;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
@@ -24,21 +30,22 @@ import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.X509Certificate;
import java.util.Map;
import static org.apache.kafka.common.requests.FetchMetadata.log;
/**
* http client工具类
- * @author wlh
*/
public class HttpClientUtils2 {
/** 全局连接池对象 */
@@ -60,10 +67,49 @@ public class HttpClientUtils2 {
}
/**
+ * 在调用SSL之前需要重写验证方法,取消检测SSL
+ * 创建ConnectionManager,添加Connection配置信息
+ *
+ * @return HttpClient 支持https
+ */
+ private PoolingHttpClientConnectionManager getSslClientManager() {
+ try {
+ // 在调用SSL之前需要重写验证方法,取消检测SSL
+ X509TrustManager trustManager = new X509TrustManager() {
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ @Override
+ public void checkClientTrusted(X509Certificate[] xcs, String str) {
+ }
+ @Override
+ public void checkServerTrusted(X509Certificate[] xcs, String str) {
+ }
+ };
+ SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
+ ctx.init(null, new TrustManager[]{trustManager}, null);
+ SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE);
+ Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("http", PlainConnectionSocketFactory.INSTANCE)
+ .register("https", socketFactory).build();
+ // 创建ConnectionManager,添加Connection配置信息
+ PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+ // 设置最大连接数
+ connManager.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION);
+ // 设置每个连接的路由数
+ connManager.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE);
+ return connManager;
+ } catch (KeyManagementException | NoSuchAlgorithmException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ /**
* 获取Http客户端连接对象
* @return Http客户端连接对象
*/
- private static CloseableHttpClient getHttpClient() {
+ private CloseableHttpClient getHttpClient() {
// 创建Http请求配置参数
RequestConfig requestConfig = RequestConfig.custom()
// 获取连接超时时间
@@ -133,146 +179,10 @@ public class HttpClientUtils2 {
.setRetryHandler(retry)
.setKeepAliveStrategy(myStrategy)
// 配置连接池管理对象
- .setConnectionManager(CONN_MANAGER)
+ .setConnectionManager(getSslClientManager())
.build();
}
-
- /**
- * GET请求
- *
- * @param uri 请求地
- * @return message
- */
- public static String httpGet(URI uri, Header... headers) {
- String msg = ERROR_MESSAGE;
-
- // 获取客户端连接对象
- CloseableHttpClient httpClient = getHttpClient();
- CloseableHttpResponse response = null;
-
- try {
- logger.info("http get uri {}",uri);
- // 创建GET请求对象
- HttpGet httpGet = new HttpGet(uri);
-
- if (StringUtil.isNotEmpty(headers)) {
- for (Header h : headers) {
- httpGet.addHeader(h);
- logger.info("request header : {}",h);
- }
- }
- // 执行请求
- response = httpClient.execute(httpGet);
- int statusCode = response.getStatusLine().getStatusCode();
- // 获取响应实体
- HttpEntity entity = response.getEntity();
- // 获取响应信息
- msg = EntityUtils.toString(entity, "UTF-8");
-
- if (statusCode != HttpStatus.SC_OK) {
- logger.error("Http get content is :{}" , msg);
- }
-
- } catch (ClientProtocolException e) {
- logger.error("协议错误: {}", e.getMessage());
- } catch (ParseException e) {
- logger.error("解析错误: {}", e.getMessage());
- } catch (IOException e) {
- logger.error("IO错误: {}",e.getMessage());
- } finally {
- if (null != response) {
- try {
- EntityUtils.consume(response.getEntity());
- response.close();
- } catch (IOException e) {
- logger.error("释放链接错误: {}", e.getMessage());
-
- }
- }
- }
-
- return msg;
- }
- /**
- * POST 请求
- * @param uri uri参数
- * @param requestBody 请求体
- * @return post请求返回结果
- */
- public static String httpPost(URI uri, String requestBody, Header... headers) {
- String msg = ERROR_MESSAGE;
- // 获取客户端连接对象
- CloseableHttpClient httpClient = getHttpClient();
-
- // 创建POST请求对象
- CloseableHttpResponse response = null;
- try {
-
- logger.info("http post uri:{}, http post body:{}", uri, requestBody);
-
- HttpPost httpPost = new HttpPost(uri);
- httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
- if (StringUtil.isNotEmpty(headers)) {
- for (Header h : headers) {
- httpPost.addHeader(h);
- logger.info("request header : {}",h);
- }
- }
-
- if(StringUtil.isNotBlank(requestBody)) {
- byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8);
- httpPost.setEntity(new ByteArrayEntity(bytes));
- }
-
- response = httpClient.execute(httpPost);
- int statusCode = response.getStatusLine().getStatusCode();
- // 获取响应实体
- HttpEntity entity = response.getEntity();
- // 获取响应信息
- msg = EntityUtils.toString(entity, "UTF-8");
-
- if (statusCode != HttpStatus.SC_OK) {
- logger.error("Http post content is :{}" , msg);
- }
- } catch (ClientProtocolException e) {
- logger.error("协议错误: {}", e.getMessage());
- } catch (ParseException e) {
- logger.error("解析错误: {}", e.getMessage());
- } catch (IOException e) {
- logger.error("IO错误: {}", e.getMessage());
- } finally {
- if (null != response) {
- try {
- EntityUtils.consumeQuietly(response.getEntity());
- response.close();
- } catch (IOException e) {
- logger.error("释放链接错误: {}", e.getMessage());
-
- }
- }
- }
- return msg;
- }
-
- /**
- * 拼装url
- * url ,参数map
- */
- public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map<String, Object> params) {
- try {
- uriBuilder.setPath(path);
- if (params != null && !params.isEmpty()){
- for (Map.Entry<String, Object> kv : params.entrySet()) {
- uriBuilder.setParameter(kv.getKey(),kv.getValue().toString());
- }
- }
- } catch (Exception e) {
- logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params);
- }
- }
-
-
// TODO: 2022/10/19 加载知识库
public InputStream httpGetInputStream(String url, int socketTimeout, Header... headers) {
InputStream result = null;