summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
diff options
context:
space:
mode:
authorhoujinchuan <[email protected]>2024-07-17 18:10:02 +0800
committerhoujinchuan <[email protected]>2024-07-17 18:10:02 +0800
commitbeb553dddfa278dde1501c5dedc64f886f573db9 (patch)
tree759e380cad476760349c07f1b16ba41e414c8942 /src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
parentda7fecc4c10015dff0f4008b8a442302648ed8d8 (diff)
修复hos挂掉一台恢复后,hos sink负载不均衡的问题HEADv1.3.5develop
Diffstat (limited to 'src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java')
-rw-r--r--src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java12
1 files changed, 6 insertions, 6 deletions
diff --git a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
index 8aaecff..542bc26 100644
--- a/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
+++ b/src/main/java/com/zdjizhi/sink/OssSinkByCaffeineCache.java
@@ -71,10 +71,10 @@ public class OssSinkByCaffeineCache extends RichSinkFunction<FileChunk> {
public transient Counter between100KBAnd1MBTxtChunksCounter;
public transient Counter greaterThan10MBTxtChunksCounter;
private boolean isAsync;
- private CloseableHttpClient syncHttpClient;
- private CloseableHttpAsyncClient asyncHttpClient;
+ private transient CloseableHttpClient syncHttpClient;
+ private transient CloseableHttpAsyncClient asyncHttpClient;
private List<String> endpointList;
- private Cache<String, FileChunk> cache;
+ private transient Cache<String, FileChunk> cache;
public OssSinkByCaffeineCache(Configuration configuration) {
this.configuration = configuration;
@@ -242,9 +242,9 @@ public class OssSinkByCaffeineCache extends RichSinkFunction<FileChunk> {
calculateFileChunkMetrics(fileChunk, fileId);
}
- private void executeRequest(HttpPost httpPost, String url) throws RuntimeException{
+ private void executeRequest(HttpPost httpPost, String url) throws RuntimeException {
if (isAsync) {
- asyncHttpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
+ asyncHttpClient.execute(httpPost, new FutureCallback<>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
@@ -275,7 +275,7 @@ public class OssSinkByCaffeineCache extends RichSinkFunction<FileChunk> {
@Override
public void cancelled() {
-
+ LOG.error("post file error. request cancelled. url: " + url);
}
});
} else {