summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2020-10-28 18:57:16 +0800
committerdoufenghu <[email protected]>2020-10-28 18:57:16 +0800
commitbe3891589084ff28c214c9e9c19ebeacb5a6e8fe (patch)
tree16b2354b14caaaa7be2822b621fd9d16169584aa
parent2273acd47c6bd312f770e1beae05185d07a2b22d (diff)
fix(batch): 增加无法链接错误计数1.2.1
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java1
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncSingleMsgCallBack.java1
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java10
3 files changed, 7 insertions, 5 deletions
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java
index 43c8eab..f1e94b3 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java
@@ -194,6 +194,7 @@ public class AsyncBatchMsgCallBack implements FutureCallback<HttpResponse> {
*/
@Override
public void failed(Exception e) {
+ AvroMonitorTimerTask.addFailedNum(count);
logger.error("AsyncPostMagCallBack Request is Failed,This Failed data checksum is ==>"+ checksum);
}
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncSingleMsgCallBack.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncSingleMsgCallBack.java
index 2d7eb33..416a9a3 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncSingleMsgCallBack.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncSingleMsgCallBack.java
@@ -208,6 +208,7 @@ public class AsyncSingleMsgCallBack implements FutureCallback<HttpResponse> {
*/
@Override
public void failed(Exception e) {
+ AvroMonitorTimerTask.addFailedNum(count);
logger.error("AsyncSingleMsgCallBack Request is Failed,This Failed data is ==>"
+ dataJson );
}
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java
index 2c1372e..231a929 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java
@@ -175,7 +175,7 @@ public class HttpManager {
LaxRedirectStrategy redirectStrategy = new LaxRedirectStrategy();
- ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
+ /* ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator
(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
@@ -187,7 +187,7 @@ public class HttpManager {
}
}
return 10 * 1000;//如果没有约定,则默认定义时长为60s
- };
+ };*/
/**
* 原版
@@ -197,7 +197,7 @@ public class HttpManager {
.setDefaultRequestConfig(requestConfig)
.setRedirectStrategy(redirectStrategy)
.setRetryHandler(myRetryHandler)
- .setKeepAliveStrategy(myStrategy)
+ // .setKeepAliveStrategy(myStrategy)
.build();
return httpClient;
@@ -648,7 +648,7 @@ public class HttpManager {
httpPost.addHeader("User-Agent", userAgent);
httpPost.addHeader("X-Tag", xTag);//根据最新文档,目前已经不需要此头-20191217
httpPost.addHeader("Content-Type", "application/json");
- httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
+ // httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
StringEntity payload = new StringEntity(data, Charset.forName("utf-8"));
//payload.setContentType("text/xml; charset=UTF-8");
payload.setContentEncoding("utf-8");
@@ -699,7 +699,7 @@ public class HttpManager {
logger.error("释放链接错误: " + e.getMessage());
}
}
- //httpPost.abort();
+ httpPost.abort();
}
return acResBody;
}