diff options
| author | unknown <[email protected]> | 2022-11-28 16:42:56 +0800 |
|---|---|---|
| committer | unknown <[email protected]> | 2022-11-28 16:42:56 +0800 |
| commit | 0662d265dd05e94c021dbd03d75ee67ffcb61ae0 (patch) | |
| tree | c9e09daebda11329d702a1cc29800c079a4e9650 | |
| parent | 87fe11dc93f55ae273062f29823f04685675218d (diff) | |
GAL-224 取消SSL检测,新增HDFS高可用设置knowledge
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/HdfsUtils.java | 27 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/HttpClientUtils2.java | 192 |
2 files changed, 62 insertions, 157 deletions
diff --git a/src/main/java/com/zdjizhi/utils/HdfsUtils.java b/src/main/java/com/zdjizhi/utils/HdfsUtils.java index 25f90bf..c1e4021 100644 --- a/src/main/java/com/zdjizhi/utils/HdfsUtils.java +++ b/src/main/java/com/zdjizhi/utils/HdfsUtils.java @@ -22,28 +22,23 @@ public class HdfsUtils { static { Configuration configuration = new Configuration(); try { - //指定用户 //配置hdfs相关信息 -// configuration.set("fs.defaultFS","hdfs://ns1"); -// configuration.set("hadoop.proxyuser.root.hosts","*"); -// configuration.set("hadoop.proxyuser.root.groups","*"); -// configuration.set("ha.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM); -// configuration.set("dfs.nameservices","ns1"); -// configuration.set("dfs.ha.namenodes.ns1","nn1,nn2"); -// configuration.set("dfs.namenode.rpc-address.ns1.nn1",CommonConfig.HDFS_URI_NS1); -// configuration.set("dfs.namenode.rpc-address.ns1.nn2",CommonConfig.HDFS_URI_NS2); -// configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); -// //创建fileSystem,用于连接hdfs -//// fileSystem = FileSystem.get(configuration); + configuration.set("fs.defaultFS","hdfs://ns1"); + configuration.set("hadoop.proxyuser.root.hosts","*"); + configuration.set("hadoop.proxyuser.root.groups","*"); + configuration.set("ha.zookeeper.quorum", CommonConfig.HBASE_ZOOKEEPER_QUORUM); + configuration.set("dfs.nameservices","ns1"); + configuration.set("dfs.ha.namenodes.ns1","nn1,nn2"); + configuration.set("dfs.namenode.rpc-address.ns1.nn1",CommonConfig.HDFS_URI_NS1); + configuration.set("dfs.namenode.rpc-address.ns1.nn2",CommonConfig.HDFS_URI_NS2); + configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + //指定用户 System.setProperty("HADOOP_USER_NAME", CommonConfig.HDFS_USER); //创建fileSystem,用于连接hdfs - fileSystem = FileSystem.get(new URI(CommonConfig.HDFS_URI_NS1),configuration); + fileSystem = FileSystem.get(configuration); } catch (IOException e) { throw new RuntimeException(e); } - catch (URISyntaxException e) { - e.printStackTrace(); - } } public static boolean isExists(String filePath) throws IOException { diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java b/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java index ddfd210..1136e6d 100644 --- a/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java +++ b/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java @@ -11,9 +11,15 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.utils.URIBuilder; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.conn.ConnectionKeepAliveStrategy; import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; @@ -24,21 +30,22 @@ import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.*; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.net.URI; import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.X509Certificate; import java.util.Map; import static org.apache.kafka.common.requests.FetchMetadata.log; /** * http client工具类 - * @author wlh */ public class HttpClientUtils2 { /** 全局连接池对象 */ @@ -60,10 +67,49 @@ public class HttpClientUtils2 { } /** + * 在调用SSL之前需要重写验证方法,取消检测SSL + * 创建ConnectionManager,添加Connection配置信息 + * + * @return HttpClient 支持https + */ + private PoolingHttpClientConnectionManager getSslClientManager() { + try { + // 在调用SSL之前需要重写验证方法,取消检测SSL + X509TrustManager trustManager = new X509TrustManager() { + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + @Override + public void checkClientTrusted(X509Certificate[] xcs, String str) { + } + @Override + public void checkServerTrusted(X509Certificate[] xcs, String str) { + } + }; + SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS); + ctx.init(null, new TrustManager[]{trustManager}, null); + SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE); + Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create() + .register("http", PlainConnectionSocketFactory.INSTANCE) + .register("https", socketFactory).build(); + // 创建ConnectionManager,添加Connection配置信息 + PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry); + // 设置最大连接数 + connManager.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION); + // 设置每个连接的路由数 + connManager.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE); + return connManager; + } catch (KeyManagementException | NoSuchAlgorithmException e) { + throw new RuntimeException(e.getMessage()); + } + } + + /** * 获取Http客户端连接对象 * @return Http客户端连接对象 */ - private static CloseableHttpClient getHttpClient() { + private CloseableHttpClient getHttpClient() { // 创建Http请求配置参数 RequestConfig requestConfig = RequestConfig.custom() // 获取连接超时时间 @@ -133,146 +179,10 @@ public class HttpClientUtils2 { .setRetryHandler(retry) .setKeepAliveStrategy(myStrategy) // 配置连接池管理对象 - .setConnectionManager(CONN_MANAGER) + .setConnectionManager(getSslClientManager()) .build(); } - - /** - * GET请求 - * - * @param uri 请求地 - * @return message - */ - public static String httpGet(URI uri, Header... headers) { - String msg = ERROR_MESSAGE; - - // 获取客户端连接对象 - CloseableHttpClient httpClient = getHttpClient(); - CloseableHttpResponse response = null; - - try { - logger.info("http get uri {}",uri); - // 创建GET请求对象 - HttpGet httpGet = new HttpGet(uri); - - if (StringUtil.isNotEmpty(headers)) { - for (Header h : headers) { - httpGet.addHeader(h); - logger.info("request header : {}",h); - } - } - // 执行请求 - response = httpClient.execute(httpGet); - int statusCode = response.getStatusLine().getStatusCode(); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - msg = EntityUtils.toString(entity, "UTF-8"); - - if (statusCode != HttpStatus.SC_OK) { - logger.error("Http get content is :{}" , msg); - } - - } catch (ClientProtocolException e) { - logger.error("协议错误: {}", e.getMessage()); - } catch (ParseException e) { - logger.error("解析错误: {}", e.getMessage()); - } catch (IOException e) { - logger.error("IO错误: {}",e.getMessage()); - } finally { - if (null != response) { - try { - EntityUtils.consume(response.getEntity()); - response.close(); - } catch (IOException e) { - logger.error("释放链接错误: {}", e.getMessage()); - - } - } - } - - return msg; - } - /** - * POST 请求 - * @param uri uri参数 - * @param requestBody 请求体 - * @return post请求返回结果 - */ - public static String httpPost(URI uri, String requestBody, Header... headers) { - String msg = ERROR_MESSAGE; - // 获取客户端连接对象 - CloseableHttpClient httpClient = getHttpClient(); - - // 创建POST请求对象 - CloseableHttpResponse response = null; - try { - - logger.info("http post uri:{}, http post body:{}", uri, requestBody); - - HttpPost httpPost = new HttpPost(uri); - httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded"); - if (StringUtil.isNotEmpty(headers)) { - for (Header h : headers) { - httpPost.addHeader(h); - logger.info("request header : {}",h); - } - } - - if(StringUtil.isNotBlank(requestBody)) { - byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8); - httpPost.setEntity(new ByteArrayEntity(bytes)); - } - - response = httpClient.execute(httpPost); - int statusCode = response.getStatusLine().getStatusCode(); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - msg = EntityUtils.toString(entity, "UTF-8"); - - if (statusCode != HttpStatus.SC_OK) { - logger.error("Http post content is :{}" , msg); - } - } catch (ClientProtocolException e) { - logger.error("协议错误: {}", e.getMessage()); - } catch (ParseException e) { - logger.error("解析错误: {}", e.getMessage()); - } catch (IOException e) { - logger.error("IO错误: {}", e.getMessage()); - } finally { - if (null != response) { - try { - EntityUtils.consumeQuietly(response.getEntity()); - response.close(); - } catch (IOException e) { - logger.error("释放链接错误: {}", e.getMessage()); - - } - } - } - return msg; - } - - /** - * 拼装url - * url ,参数map - */ - public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map<String, Object> params) { - try { - uriBuilder.setPath(path); - if (params != null && !params.isEmpty()){ - for (Map.Entry<String, Object> kv : params.entrySet()) { - uriBuilder.setParameter(kv.getKey(),kv.getValue().toString()); - } - } - } catch (Exception e) { - logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params); - } - } - - // TODO: 2022/10/19 加载知识库 public InputStream httpGetInputStream(String url, int socketTimeout, Header... headers) { InputStream result = null; |
