diff options
| -rw-r--r-- | config/application.yml | 2 | ||||
| -rw-r--r-- | pom.xml | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/syncfile/FileSyncServiceApplication.java | 1 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java | 118 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java | 4 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java | 3 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java | 17 |
7 files changed, 86 insertions, 61 deletions
diff --git a/config/application.yml b/config/application.yml index 613ad62..198903c 100644 --- a/config/application.yml +++ b/config/application.yml @@ -3,7 +3,7 @@ nacos: type: yaml server-addr: 192.168.44.12:8848 namespace: test - data-id: p19-file-sync-service + data-id: galaxy-fsync-service auto-refresh: true group: Galaxy username: nacos @@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>galaxy-fsync-service</artifactId> - <version>23.11.23</version> + <version>24.01.17</version> <name>galaxy-fsync-service</name> <parent> diff --git a/src/main/java/com/zdjizhi/syncfile/FileSyncServiceApplication.java b/src/main/java/com/zdjizhi/syncfile/FileSyncServiceApplication.java index c345863..a4ef842 100644 --- a/src/main/java/com/zdjizhi/syncfile/FileSyncServiceApplication.java +++ b/src/main/java/com/zdjizhi/syncfile/FileSyncServiceApplication.java @@ -14,6 +14,7 @@ public class FileSyncServiceApplication { public static void main(String[] args) { SpringApplication.run(FileSyncServiceApplication.class, args); } + @Bean MeterRegistryCustomizer<MeterRegistry> configurer(@Value("${spring.application.name}") String applicationName) { return registry -> registry.config().commonTags("application", applicationName); diff --git a/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java b/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java index 6e0c6a5..8cc83c6 100644 --- a/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java +++ b/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java @@ -10,21 +10,23 @@ import org.apache.http.NoHttpResponseException; import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; -import org.apache.http.conn.ssl.TrustSelfSignedStrategy; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; -import org.apache.http.protocol.HttpContext; -import org.apache.http.ssl.SSLContextBuilder; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import javax.net.ssl.*; -import java.io.IOException; import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.UnknownHostException; +import java.security.cert.X509Certificate; @Component @NacosConfigurationProperties(prefix = "http", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true) @@ -81,49 +83,73 @@ public class HttpClientPool { public CloseableHttpClient getCloseableHttpClient() { CloseableHttpClient httpClient = null; try { - HttpRequestRetryHandler httpRetryHandler = new HttpRequestRetryHandler() { + HttpRequestRetryHandler httpRetryHandler = (exception, executionCount, context) -> { + if (executionCount >= retryNum) {// 如果已经重试了3次,就放弃 + log.error("已完成重试次数"); + return false; + } + if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 + return true; + } + if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 + return false; + } + if (exception instanceof ConnectException) {// 连接被拒绝 + return false; + } + if (exception instanceof InterruptedIOException) {// 超时 + return true; + } + if (exception instanceof UnknownHostException) {// 目标服务器不可达 + return false; + } + if (exception instanceof SSLException) {// ssl握手异常 + return false; + } + HttpClientContext clientContext = HttpClientContext.adapt(context); + HttpRequest request = clientContext.getRequest(); + // 如果请求是幂等的,就再次尝试 + return !(request instanceof HttpEntityEnclosingRequest); + }; + + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(connectTimeout) + .setConnectionRequestTimeout(connectionRequestTimeout) + .setSocketTimeout(socketTimeout) + .setStaleConnectionCheckEnabled(staleConnectionCheckEnabled) + .build(); + X509TrustManager trustManager = new X509TrustManager() { @Override - public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { - if (executionCount >= retryNum) {// 如果已经重试了3次,就放弃 - log.error("已完成重试次数"); - return false; - } - if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 - return true; - } - if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 - return false; - } - if (exception instanceof ConnectException) {// 连接被拒绝 - return false; - } - if (exception instanceof InterruptedIOException) {// 超时 - return true; - } - if (exception instanceof UnknownHostException) {// 目标服务器不可达 - return false; - } - if (exception instanceof SSLException) {// ssl握手异常 - return false; - } - HttpClientContext clientContext = HttpClientContext.adapt(context); - HttpRequest request = clientContext.getRequest(); - // 如果请求是幂等的,就再次尝试 - return !(request instanceof HttpEntityEnclosingRequest); + public X509Certificate[] getAcceptedIssuers() { + return null; + } + + @Override + public void checkClientTrusted(X509Certificate[] xcs, String str) { + } + + @Override + public void checkServerTrusted(X509Certificate[] xcs, String str) { } }; - PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(); + SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS); + ctx.init(null, new TrustManager[]{trustManager}, null); + SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE); + Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create() + .register("http", PlainConnectionSocketFactory.INSTANCE) + .register("https", socketFactory).build(); + // 创建ConnectionManager,添加Connection配置信息 + PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry); + // 设置最大连接数 httpClientConnectionManager.setMaxTotal(maxTotal); + // 设置每个连接的路由数 httpClientConnectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute); - SSLContext sslContext = SSLContextBuilder.create() - .loadTrustMaterial(new TrustSelfSignedStrategy()) - .build(); - httpClient = HttpClientBuilder - .create() - .setConnectionManager(httpClientConnectionManager) + httpClient = HttpClients.custom() + // 把请求相关的超时信息设置到连接客户端 + .setDefaultRequestConfig(requestConfig) + // 把请求重试设置到连接客户端 .setRetryHandler(httpRetryHandler) - .setSslcontext(sslContext) - .setSSLHostnameVerifier(new NoopHostnameVerifier()) + // 配置连接池管理对象 + .setConnectionManager(httpClientConnectionManager) .build(); } catch (Exception e) { log.error("create httpClient error.", e); @@ -131,12 +157,4 @@ public class HttpClientPool { return httpClient; } - @Bean(name = "requestConfig") - public RequestConfig getRequestConfig() { - return RequestConfig.custom().setConnectTimeout(connectTimeout) - .setConnectionRequestTimeout(connectionRequestTimeout) - .setSocketTimeout(socketTimeout) - .setStaleConnectionCheckEnabled(staleConnectionCheckEnabled) - .build(); - } }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java b/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java index b630273..f11e214 100644 --- a/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java @@ -105,10 +105,10 @@ public class KafkaConsumer { propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, group_id); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset); - if(servers.contains("9094")){ + if (servers.contains("9094")) { propsMap.put("security.protocol", "SASL_PLAINTEXT"); propsMap.put("sasl.mechanism", "PLAIN"); - propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="+sasl_username+" password="+sasl_password+";"); + propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + sasl_username + " password=" + sasl_password + ";"); } return propsMap; } diff --git a/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java b/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java index 14a659f..da105ee 100644 --- a/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java +++ b/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java @@ -75,7 +75,8 @@ public class SyncFiles { log.error("Sync file failed, post oss file error. source_oss_path: {}, destination_oss_path: {}", source_oss_path, destination_oss_path); monitorProperties.addPostFileErrorCount(); } - } else if (hosResponse.getCode() == 404 && hosResponse.getMsg().contains("NoSuchKey")) { + } else if (hosResponse.getCode() == 404 && hosResponse.getMsg().contains("NoSuchKey") + || hosResponse.getCode() == 409 && hosResponse.getMsg().contains("IncompleteObject")) { isRetry = true; } else { log.error("Sync file failed, get hos file error. source_oss_path: {}, destination_oss_path: {}", source_oss_path, destination_oss_path); diff --git a/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java b/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java index 5240284..c7fe16f 100644 --- a/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java +++ b/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java @@ -25,8 +25,6 @@ public class HttpUtil { @Autowired private CloseableHttpClient httpClient; @Autowired - private RequestConfig requestConfig; - @Autowired private MonitorProperties monitorProperties; public HosResponse httpGetFile(String url) { @@ -34,7 +32,6 @@ public class HttpUtil { CloseableHttpResponse response = null; try { HttpGet httpGet = new HttpGet(url); - httpGet.setConfig(requestConfig); response = httpClient.execute(httpGet); int statusCode = response.getStatusLine().getStatusCode(); hosResponse.setCode(statusCode); @@ -46,8 +43,17 @@ public class HttpUtil { monitorProperties.addDownloadFileSize(data.length); } else if (statusCode == 404) { String msg = EntityUtils.toString(response.getEntity(), "UTF-8"); - hosResponse.setMsg(msg); - if (!msg.contains("NoSuchKey")) { + if (msg.contains("NoSuchKey")) { + hosResponse.setMsg(msg); + } else { + log.error("get file error. current url: {}, code: {}, msg: {}", url, statusCode, msg); + monitorProperties.addFileSyncError(); + } + } else if (statusCode == 409) { + String msg = EntityUtils.toString(response.getEntity(), "UTF-8"); + if (msg.contains("IncompleteObject")) { + hosResponse.setMsg(msg); + } else { log.error("get file error. current url: {}, code: {}, msg: {}", url, statusCode, msg); monitorProperties.addFileSyncError(); } @@ -72,7 +78,6 @@ public class HttpUtil { CloseableHttpResponse response = null; try { HttpPost httpPost = new HttpPost(url); - httpPost.setConfig(requestConfig); httpPost.setEntity(new ByteArrayEntity(fileData)); response = httpClient.execute(httpPost); String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8"); |
