summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--config/application.yml2
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/com/zdjizhi/syncfile/FileSyncServiceApplication.java1
-rw-r--r--src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java118
-rw-r--r--src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java4
-rw-r--r--src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java3
-rw-r--r--src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java17
7 files changed, 86 insertions, 61 deletions
diff --git a/config/application.yml b/config/application.yml
index 613ad62..198903c 100644
--- a/config/application.yml
+++ b/config/application.yml
@@ -3,7 +3,7 @@ nacos:
type: yaml
server-addr: 192.168.44.12:8848
namespace: test
- data-id: p19-file-sync-service
+ data-id: galaxy-fsync-service
auto-refresh: true
group: Galaxy
username: nacos
diff --git a/pom.xml b/pom.xml
index 2c53f4b..a7ae952 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy-fsync-service</artifactId>
- <version>23.11.23</version>
+ <version>24.01.17</version>
<name>galaxy-fsync-service</name>
<parent>
diff --git a/src/main/java/com/zdjizhi/syncfile/FileSyncServiceApplication.java b/src/main/java/com/zdjizhi/syncfile/FileSyncServiceApplication.java
index c345863..a4ef842 100644
--- a/src/main/java/com/zdjizhi/syncfile/FileSyncServiceApplication.java
+++ b/src/main/java/com/zdjizhi/syncfile/FileSyncServiceApplication.java
@@ -14,6 +14,7 @@ public class FileSyncServiceApplication {
public static void main(String[] args) {
SpringApplication.run(FileSyncServiceApplication.class, args);
}
+
@Bean
MeterRegistryCustomizer<MeterRegistry> configurer(@Value("${spring.application.name}") String applicationName) {
return registry -> registry.config().commonTags("application", applicationName);
diff --git a/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java b/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java
index 6e0c6a5..8cc83c6 100644
--- a/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java
+++ b/src/main/java/com/zdjizhi/syncfile/config/HttpClientPool.java
@@ -10,21 +10,23 @@ import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-import org.apache.http.protocol.HttpContext;
-import org.apache.http.ssl.SSLContextBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.net.ssl.*;
-import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.UnknownHostException;
+import java.security.cert.X509Certificate;
@Component
@NacosConfigurationProperties(prefix = "http", dataId = "${nacos.config.data-id}", groupId = "${nacos.config.group}", type = ConfigType.YAML, autoRefreshed = true)
@@ -81,49 +83,73 @@ public class HttpClientPool {
public CloseableHttpClient getCloseableHttpClient() {
CloseableHttpClient httpClient = null;
try {
- HttpRequestRetryHandler httpRetryHandler = new HttpRequestRetryHandler() {
+ HttpRequestRetryHandler httpRetryHandler = (exception, executionCount, context) -> {
+ if (executionCount >= retryNum) {// 如果已经重试了3次,就放弃
+ log.error("已完成重试次数");
+ return false;
+ }
+ if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
+ return true;
+ }
+ if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
+ return false;
+ }
+ if (exception instanceof ConnectException) {// 连接被拒绝
+ return false;
+ }
+ if (exception instanceof InterruptedIOException) {// 超时
+ return true;
+ }
+ if (exception instanceof UnknownHostException) {// 目标服务器不可达
+ return false;
+ }
+ if (exception instanceof SSLException) {// ssl握手异常
+ return false;
+ }
+ HttpClientContext clientContext = HttpClientContext.adapt(context);
+ HttpRequest request = clientContext.getRequest();
+ // 如果请求是幂等的,就再次尝试
+ return !(request instanceof HttpEntityEnclosingRequest);
+ };
+
+ RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(connectTimeout)
+ .setConnectionRequestTimeout(connectionRequestTimeout)
+ .setSocketTimeout(socketTimeout)
+ .setStaleConnectionCheckEnabled(staleConnectionCheckEnabled)
+ .build();
+ X509TrustManager trustManager = new X509TrustManager() {
@Override
- public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
- if (executionCount >= retryNum) {// 如果已经重试了3次,就放弃
- log.error("已完成重试次数");
- return false;
- }
- if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
- return true;
- }
- if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
- return false;
- }
- if (exception instanceof ConnectException) {// 连接被拒绝
- return false;
- }
- if (exception instanceof InterruptedIOException) {// 超时
- return true;
- }
- if (exception instanceof UnknownHostException) {// 目标服务器不可达
- return false;
- }
- if (exception instanceof SSLException) {// ssl握手异常
- return false;
- }
- HttpClientContext clientContext = HttpClientContext.adapt(context);
- HttpRequest request = clientContext.getRequest();
- // 如果请求是幂等的,就再次尝试
- return !(request instanceof HttpEntityEnclosingRequest);
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+
+ @Override
+ public void checkClientTrusted(X509Certificate[] xcs, String str) {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] xcs, String str) {
}
};
- PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager();
+ SSLContext ctx = SSLContext.getInstance(SSLConnectionSocketFactory.TLS);
+ ctx.init(null, new TrustManager[]{trustManager}, null);
+ SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(ctx, NoopHostnameVerifier.INSTANCE);
+ Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("http", PlainConnectionSocketFactory.INSTANCE)
+ .register("https", socketFactory).build();
+ // 创建ConnectionManager,添加Connection配置信息
+ PoolingHttpClientConnectionManager httpClientConnectionManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+ // 设置最大连接数
httpClientConnectionManager.setMaxTotal(maxTotal);
+ // 设置每个连接的路由数
httpClientConnectionManager.setDefaultMaxPerRoute(defaultMaxPerRoute);
- SSLContext sslContext = SSLContextBuilder.create()
- .loadTrustMaterial(new TrustSelfSignedStrategy())
- .build();
- httpClient = HttpClientBuilder
- .create()
- .setConnectionManager(httpClientConnectionManager)
+ httpClient = HttpClients.custom()
+ // 把请求相关的超时信息设置到连接客户端
+ .setDefaultRequestConfig(requestConfig)
+ // 把请求重试设置到连接客户端
.setRetryHandler(httpRetryHandler)
- .setSslcontext(sslContext)
- .setSSLHostnameVerifier(new NoopHostnameVerifier())
+ // 配置连接池管理对象
+ .setConnectionManager(httpClientConnectionManager)
.build();
} catch (Exception e) {
log.error("create httpClient error.", e);
@@ -131,12 +157,4 @@ public class HttpClientPool {
return httpClient;
}
- @Bean(name = "requestConfig")
- public RequestConfig getRequestConfig() {
- return RequestConfig.custom().setConnectTimeout(connectTimeout)
- .setConnectionRequestTimeout(connectionRequestTimeout)
- .setSocketTimeout(socketTimeout)
- .setStaleConnectionCheckEnabled(staleConnectionCheckEnabled)
- .build();
- }
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java b/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java
index b630273..f11e214 100644
--- a/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/syncfile/config/KafkaConsumer.java
@@ -105,10 +105,10 @@ public class KafkaConsumer {
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
- if(servers.contains("9094")){
+ if (servers.contains("9094")) {
propsMap.put("security.protocol", "SASL_PLAINTEXT");
propsMap.put("sasl.mechanism", "PLAIN");
- propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="+sasl_username+" password="+sasl_password+";");
+ propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + sasl_username + " password=" + sasl_password + ";");
}
return propsMap;
}
diff --git a/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java b/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java
index 14a659f..da105ee 100644
--- a/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java
+++ b/src/main/java/com/zdjizhi/syncfile/core/SyncFiles.java
@@ -75,7 +75,8 @@ public class SyncFiles {
log.error("Sync file failed, post oss file error. source_oss_path: {}, destination_oss_path: {}", source_oss_path, destination_oss_path);
monitorProperties.addPostFileErrorCount();
}
- } else if (hosResponse.getCode() == 404 && hosResponse.getMsg().contains("NoSuchKey")) {
+ } else if (hosResponse.getCode() == 404 && hosResponse.getMsg().contains("NoSuchKey")
+ || hosResponse.getCode() == 409 && hosResponse.getMsg().contains("IncompleteObject")) {
isRetry = true;
} else {
log.error("Sync file failed, get hos file error. source_oss_path: {}, destination_oss_path: {}", source_oss_path, destination_oss_path);
diff --git a/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java b/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java
index 5240284..c7fe16f 100644
--- a/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java
+++ b/src/main/java/com/zdjizhi/syncfile/utils/HttpUtil.java
@@ -25,8 +25,6 @@ public class HttpUtil {
@Autowired
private CloseableHttpClient httpClient;
@Autowired
- private RequestConfig requestConfig;
- @Autowired
private MonitorProperties monitorProperties;
public HosResponse httpGetFile(String url) {
@@ -34,7 +32,6 @@ public class HttpUtil {
CloseableHttpResponse response = null;
try {
HttpGet httpGet = new HttpGet(url);
- httpGet.setConfig(requestConfig);
response = httpClient.execute(httpGet);
int statusCode = response.getStatusLine().getStatusCode();
hosResponse.setCode(statusCode);
@@ -46,8 +43,17 @@ public class HttpUtil {
monitorProperties.addDownloadFileSize(data.length);
} else if (statusCode == 404) {
String msg = EntityUtils.toString(response.getEntity(), "UTF-8");
- hosResponse.setMsg(msg);
- if (!msg.contains("NoSuchKey")) {
+ if (msg.contains("NoSuchKey")) {
+ hosResponse.setMsg(msg);
+ } else {
+ log.error("get file error. current url: {}, code: {}, msg: {}", url, statusCode, msg);
+ monitorProperties.addFileSyncError();
+ }
+ } else if (statusCode == 409) {
+ String msg = EntityUtils.toString(response.getEntity(), "UTF-8");
+ if (msg.contains("IncompleteObject")) {
+ hosResponse.setMsg(msg);
+ } else {
log.error("get file error. current url: {}, code: {}, msg: {}", url, statusCode, msg);
monitorProperties.addFileSyncError();
}
@@ -72,7 +78,6 @@ public class HttpUtil {
CloseableHttpResponse response = null;
try {
HttpPost httpPost = new HttpPost(url);
- httpPost.setConfig(requestConfig);
httpPost.setEntity(new ByteArrayEntity(fileData));
response = httpClient.execute(httpPost);
String responseEntity = EntityUtils.toString(response.getEntity(), "UTF-8");