summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordoufenghu <[email protected]>2020-10-27 17:06:02 +0800
committerdoufenghu <[email protected]>2020-10-27 17:06:02 +0800
commit2273acd47c6bd312f770e1beae05185d07a2b22d (patch)
treeb8bd8640f65b21fbfdb7ae28c2f74865aad2e26c
parent5e26e421ced08f595b9a6696285bed7575876c47 (diff)
fix(batch):HEADmaster
1. 支持单条与批量发送接口。-涉及所有各个Topic 的conf 文件,需要修改总线的文件路径 2. 解决总线负载过高,发送重定向URL,对内容进行缓存,造成内存溢出问题。 3. 增加批量回调接口与单条进行区分,单条目前支持缓存重定向后的错误数据进行重发,目前支持最大缓存10w条。
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java8
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/SinkHttpClientUtil.java232
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientGetFileCallback.java2
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientPostFileCallback.java2
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java3
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java633
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncGetMailFilesCallback.java2
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncPostMailFilesCallback.java4
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java206
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncSingleMsgCallBack.java (renamed from yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java)47
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/SinkService.java139
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java351
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java2
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java27
-rw-r--r--yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/bean/configBean/ConfigInfo.java22
-rw-r--r--yb_http_avro_sink_file/src/main/resources/flume_config.properties10
16 files changed, 443 insertions, 1247 deletions
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java
index f072cf6..9d8523f 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/HttpClientTest.java
@@ -1,30 +1,22 @@
package cn.ac.iie.cusflume.sink.CommonUtils;
-import cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientFactory;
import cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientUtil;
-import cn.ac.iie.cusflume.sink.HttpAsyncUtils.msgCallBack.AsyncPostMsgCallBack;
import cn.ac.iie.cusflume.sink.avroUtils.MD5Utils;
-import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;
import org.mortbay.log.Log;
-import scala.util.Try;
import java.io.IOException;
-import java.net.MalformedURLException;
public class HttpClientTest {
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/SinkHttpClientUtil.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/SinkHttpClientUtil.java
deleted file mode 100644
index acb69dd..0000000
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/CommonUtils/SinkHttpClientUtil.java
+++ /dev/null
@@ -1,232 +0,0 @@
-package cn.ac.iie.cusflume.sink.CommonUtils;
-
-import cn.ac.iie.cusflume.sink.YbHttpAvroSinkFile;
-import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask;
-import cn.ac.iie.cusflume.sink.bean.redirectBean.ResRedirBody;
-import cn.ac.iie.cusflume.sink.daoUtils.RealtimeCountConfig;
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.commons.lang.StringUtils;
-import org.apache.http.*;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.HttpRequestRetryHandler;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.protocol.HttpClientContext;
-import org.apache.http.conn.ConnectTimeoutException;
-import org.apache.http.conn.ConnectionKeepAliveStrategy;
-import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-import org.apache.http.message.BasicHeaderElementIterator;
-import org.apache.http.protocol.HTTP;
-import org.apache.http.util.EntityUtils;
-import org.apache.log4j.Logger;
-import org.mortbay.log.Log;
-
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLHandshakeException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.UnknownHostException;
-
-public class SinkHttpClientUtil {
- private static Logger LOG = Logger.getLogger(SinkHttpClientUtil.class);
- /** 全局连接池对象 */
- private static final PoolingHttpClientConnectionManager connManager =
- new PoolingHttpClientConnectionManager();
- public static final String DEFAULT_CHARSET = "utf-8";
- private static int socketTimeout = RealtimeCountConfig.HTTP_ASYNC_SOCKETTIMEOUT;//设置等待数据超时时间60秒钟 根据业务调整
- private static int connectTimeout = RealtimeCountConfig.HTTP_ASYNC_CONNECTTIMEOUT;//连接超时
- private static int poolSize = RealtimeCountConfig.HTTP_ASYNC_POOLSIZE;//连接池最大连接数
- private static int maxPerRoute = RealtimeCountConfig.HTTP_ASYNC_MAXPERROUTE;//每个主机的并发最多只有1500
- private static int connectionRequestTimeout = RealtimeCountConfig.HTTP_ASYNC_CONNECTIONREQUESTTIMEOUT; //从连接池中后去连接的timeout时间
-
-
- static {
- connManager.setMaxTotal(poolSize);
- connManager.setDefaultMaxPerRoute(maxPerRoute);
- }
-
- public static CloseableHttpClient getHttpClient() {
- RequestConfig requestConfig = RequestConfig.custom()
- // 获取连接超时时间
- .setConnectionRequestTimeout(connectionRequestTimeout)
- // 请求超时时间
- .setConnectTimeout(connectTimeout)
- // 响应超时时间
- .setSocketTimeout(socketTimeout)
- .build();
-
-
- /**
- * 测出超时重试机制为了防止超时不生效而设置
- * 如果直接放回false,不重试
- * 这里会根据情况进行判断是否重试
- */
- HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
- if (executionCount >= 3) {// 如果已经重试了3次,就放弃
- return false;
- }
- if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
- return true;
- }
- if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
- return false;
- }
- if (exception instanceof InterruptedIOException) {// 超时
- return true;
- }
- if (exception instanceof UnknownHostException) {// 目标服务器不可达
- return false;
- }
- if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
- return false;
- }
- if (exception instanceof SSLException) {// ssl握手异常
- return false;
- }
- HttpClientContext clientContext = HttpClientContext.adapt(context);
- HttpRequest request = clientContext.getRequest();
- // 如果请求是幂等的,就再次尝试
- if (!(request instanceof HttpEntityEnclosingRequest)) {
- return true;
- }
- return false;
- };
-
-
- ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
- HeaderElementIterator it = new BasicHeaderElementIterator
- (response.headerIterator(HTTP.CONN_KEEP_ALIVE));
- while (it.hasNext()) {
- HeaderElement he = it.nextElement();
- String param = he.getName();
- String value = he.getValue();
- if (value != null && param.equalsIgnoreCase("timeout")) {
- return Long.parseLong(value) * 1000;
- }
- }
- return 60 * 1000;//如果没有约定,则默认定义时长为60s
- };
-
- // 创建httpClient
- return HttpClients.custom()
- // 把请求相关的超时信息设置到连接客户端
- .setDefaultRequestConfig(requestConfig)
- // 把请求重试设置到连接客户端
- .setRetryHandler(retry)
- .setKeepAliveStrategy(myStrategy)
- // 配置连接池管理对象
- .setConnectionManager(connManager)
- .build();
-
- }
-
-
- public static void httpPost(String url, byte[] requestBody, int batchSize, Header... headers) {
- String msg = "-1";
- // 获取客户端连接对象
- CloseableHttpClient httpClient = getHttpClient();
- // 创建POST请求对象
- CloseableHttpResponse response = null;
- try {
- HttpPost httpPost = new HttpPost(url);
- httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
- httpPost.setHeader("Content-Type", "application/json");
- if (StringUtil.isNotEmpty(headers)) {
- for (Header h : headers) {
- httpPost.addHeader(h);
- }
- }
- ByteArrayEntity payload = new ByteArrayEntity(requestBody);
- payload.setContentEncoding("utf-8");
- httpPost.setEntity(payload);
- long startTime = System.currentTimeMillis();
- response = httpClient.execute(httpPost);
- int statusCode = response.getStatusLine().getStatusCode();
- // 获取响应实体
- HttpEntity entity = response.getEntity();
- // 获取响应信息
- msg = EntityUtils.toString(entity, "UTF-8");
- Log.warn(Thread.currentThread().getName() + ",请求响应用时(ms):" + (System.currentTimeMillis() - startTime));
- if (statusCode != HttpStatus.SC_OK ) {
- AvroMonitorTimerTask.addSuccessNum(batchSize);
- LOG.info("数据总线响应内容:" + msg);
- } else {
- AvroMonitorTimerTask.addFailedNum(batchSize);
- LOG.error("数据总线反馈出错:" + statusCode + ",msg" + msg);
-
- switch (statusCode) {
- case 300:
- LOG.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>"
- + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求.");
- //若不包含对应字段,则不进行对象转换,减少报错
- if (msg.contains("redirect")) {
- ResRedirBody resRedirBody = JSONObject.parseObject(msg, ResRedirBody.class);
- String redirectUrl = resRedirBody.getData().getRedirect();
- if (StringUtils.isNotBlank(redirectUrl)) {
- YbHttpAvroSinkFile.changeUrl(redirectUrl);
- }
- } else {
- LOG.error("AsyncPostMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!");
- }
-
-
- break;
- case 301:
- LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode +
- ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待.");
- break;
- case 410:
- LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode
- + "服务端响应时间(ms)==>" + "<==,"
- + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie.");
- YbHttpAvroSinkFile.updateCookie();
- break;
- case 500:
- LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode
- + "服务端响应时间(ms)==>" + "<==,"
- + ",resRedirBodyCode:500,处理请求过程出现系统错误.");
- YbHttpAvroSinkFile.updateCookie();
- break;
- default:
- LOG.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + "<==,"
- + "---Status Code:" + statusCode );
- break;
- }
-
- }
-
-
-
- } catch (ClientProtocolException e) {
- LOG.error("协议错误: {}", e);
- } catch (ParseException e) {
- LOG.error("解析错误: {}", e);
- } catch (IOException e) {
- LOG.error("IO错误: {}", e);
- } finally {
- if (null != response) {
- try {
- EntityUtils.consumeQuietly(response.getEntity());
- response.close();
- } catch (IOException e) {
- LOG.error("释放链接错误: {}", e);
- }
- }
- }
-
- }
-
-
-
-
-
-
-
-
-
-}
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientGetFileCallback.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientGetFileCallback.java
index 944d5ed..66d7e88 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientGetFileCallback.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientGetFileCallback.java
@@ -52,7 +52,7 @@ public class AsyncHttpClientGetFileCallback implements FutureCallback<HttpRespon
public AsyncHttpClientGetFileCallback(ConfigInfo configInfo, String getFileUrl, String sendMsg, AsyncHttpClientPostFileCallback asyncHttpClientPostFileCallback, int count) {
this.configInfo = configInfo;
this.postFileUrl = configInfo.getPostFileUrl();//通过configInfo赋值
- this.postMsgUrl = configInfo.getPostMsgUrl();//通过configInfo赋值
+ this.postMsgUrl = configInfo.getSingleProduceUrl();//通过configInfo赋值
this.getFileUrl = getFileUrl;
this.sendMsg = sendMsg;
this.asyncHttpClientPostFileCallback = asyncHttpClientPostFileCallback;
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientPostFileCallback.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientPostFileCallback.java
index 6ed6e3f..303d196 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientPostFileCallback.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/AsyncHttpClientPostFileCallback.java
@@ -58,7 +58,7 @@ public class AsyncHttpClientPostFileCallback implements FutureCallback<HttpRespo
public AsyncHttpClientPostFileCallback(ConfigInfo configInfo, String sendMsg, int count) {
this.configInfo = configInfo;
this.postFileUrl = configInfo.getPostFileUrl();
- this.postMsgUrl = configInfo.getPostMsgUrl();
+ this.postMsgUrl = configInfo.getSingleProduceUrl();
this.sendMsg = sendMsg;//存放对应于url的数据
this.count = count;
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java
index aa70d8a..2785123 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpAsyncClient.java
@@ -68,6 +68,7 @@ public class HttpAsyncClient {
private CloseableHttpAsyncClient proxyAsyncHttpClient;
public HttpAsyncClient() {
+
try {
this.asyncHttpClient = createAsyncClient(false);
this.proxyAsyncHttpClient = createAsyncClient(true);
@@ -131,7 +132,7 @@ public class HttpAsyncClient {
.register("https", new SSLIOSessionStrategy(sslcontext, NoopHostnameVerifier.INSTANCE))
.build();
- // 配置io线程\
+ // 配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setSoKeepAlive(true)
.setTcpNoDelay(true)
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java
index 9fb923e..a27dfaf 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/HttpClientUtil.java
@@ -4,8 +4,8 @@ import cn.ac.iie.cusflume.sink.CommonUtils.GetDataDictionaryCodeByTopicUtils;
import cn.ac.iie.cusflume.sink.CommonUtils.GetFilePathByTopicUtils;
import cn.ac.iie.cusflume.sink.CommonUtils.HttpClientTest;
import cn.ac.iie.cusflume.sink.HttpAsyncUtils.mail.AsyncPostMailFilesCallback;
-import cn.ac.iie.cusflume.sink.HttpAsyncUtils.msgCallBack.AsyncPostMsgCallBack;
-import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask;
+import cn.ac.iie.cusflume.sink.HttpAsyncUtils.msgCallBack.AsyncBatchMsgCallBack;
+import cn.ac.iie.cusflume.sink.HttpAsyncUtils.msgCallBack.AsyncSingleMsgCallBack;
import cn.ac.iie.cusflume.sink.avroUtils.MD5Utils;
import cn.ac.iie.cusflume.sink.avroUtils.avroRecord.GetAvroRecordByTopicUtils;
import cn.ac.iie.cusflume.sink.avroUtils.avroSchema.GetAvroSchemaByTopicUtils;
@@ -21,7 +21,6 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -42,11 +41,12 @@ import org.apache.log4j.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -55,347 +55,22 @@ import java.util.concurrent.Executors;
* 执行请求发送
*/
public class HttpClientUtil {
-
- private static Logger LOG = Logger.getLogger(HttpClientUtil.class);
+ private static HashMap<String, Schema> schemaHashMap = new HashMap<String, Schema>();//用于存放Schma
+ private static Logger logger = Logger.getLogger(HttpClientUtil.class);
protected static ExecutorService pool = Executors.newFixedThreadPool(RealtimeCountConfig.HTTP_ASYNC_PARALLELISM);//线程池
+ private final static BlockingQueue<String> retryQueue = new ArrayBlockingQueue(100000);
+
private static String utf8Charset = "utf-8";
private static final MediaType JSON
= MediaType.parse("application/json; charset=utf-8");
- /**
- * 向指定的url发送一次post请求,参数是List<NameValuePair>
- *
- * @param baseUrl 请求地址
- * @param list 请求参数,格式是List<NameValuePair>
- * @return 返回结果, 请求失败时返回null
- * @apiNote http接口处用 @RequestParam接收参数
- */
- public static String httpSyncPost(String baseUrl, List<BasicNameValuePair> list) {
-
- CloseableHttpClient httpClient = HttpClientFactory.getInstance().getHttpSyncClientPool().getHttpClient();
- HttpPost httpPost = new HttpPost(baseUrl);
-
- //Parameters
- LOG.info("==== Parameters ======" + list);
- CloseableHttpResponse response = null;
- try {
- httpPost.setEntity(new UrlEncodedFormEntity(list));
-// httpPost.setHeader("Connection","close");
- response = httpClient.execute(httpPost);
- LOG.info("========HttpResponseProxy:========" + response.getStatusLine());
- HttpEntity entity = response.getEntity();
- String result = null;
- if (entity != null) {
- result = EntityUtils.toString(entity, "UTF-8");
- LOG.info("========Response=======" + result);
- }
- EntityUtils.consume(entity);
- return result;
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (response != null) {
- try {
- response.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return null;
- }
-
- /**
- * 向指定的url发送一次post请求,参数是字符串
- *
- * @param baseUrl 请求地址
- * @param postString 请求参数,格式是json.toString()
- * @return 返回结果, 请求失败时返回null
- * @apiNote http接口处用 @RequestBody接收参数
- */
- public static String httpSyncPost(String baseUrl, String postString) {
-
- CloseableHttpClient httpClient = HttpClientFactory.getInstance().getHttpSyncClientPool().getHttpClient();
- HttpPost httpPost = new HttpPost(baseUrl);
- //parameters
- LOG.warn("==== Parameters ======" + postString);
- CloseableHttpResponse response = null;
- try {
- if (postString == null || "".equals(postString)) {
- throw new Exception("missing post String");
- }
-
- StringEntity stringEntity = new StringEntity(postString.toString(), utf8Charset);
- stringEntity.setContentEncoding("UTF-8");
- stringEntity.setContentType("application/json");
- httpPost.setEntity(stringEntity);
-
- response = httpClient.execute(httpPost);
- LOG.warn("========HttpResponseProxy:========" + response.getStatusLine());
- HttpEntity entity = response.getEntity();
- String result = null;
- if (entity != null) {
- result = EntityUtils.toString(entity, "UTF-8");
- LOG.warn("========Response=======" + result);
- }
- EntityUtils.consume(entity);
- return result;
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (response != null) {
- try {
- response.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return null;
- }
-
-
- /**
- * 向指定的url发送一次get请求,参数是List<NameValuePair>
- *
- * @param baseUrl 请求地址
- * @param list 请求参数,格式是List<NameValuePair>
- * @return 返回结果, 请求失败时返回null
- * @apiNote http接口处用 @RequestParam接收参数
- */
- public static String httpSyncGet(String baseUrl, List<BasicNameValuePair> list) {
-
- CloseableHttpClient httpClient = HttpClientFactory.getInstance().getHttpSyncClientPool().getHttpClient();
- HttpGet httpGet = new HttpGet(baseUrl);
- //Parameters
- LOG.warn("==== Parameters ======" + list);
- CloseableHttpResponse response = null;
- try {
-
- if (list != null) {
- String getUrl = EntityUtils
- .toString(new UrlEncodedFormEntity(list));
- httpGet.setURI(new URI(httpGet.getURI().toString()
- + "?" + getUrl));
- } else {
- httpGet.setURI(new URI(httpGet.getURI().toString()));
- }
-
- response = httpClient.execute(httpGet);
- LOG.warn("========HttpResponseProxy:========" + response.getStatusLine());
- HttpEntity entity = response.getEntity();
- String result = null;
- if (entity != null) {
- result = EntityUtils.toString(entity, "UTF-8");
- LOG.warn("========Response=======" + result);
- }
- EntityUtils.consume(entity);
- return result;
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (response != null) {
- try {
- response.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return null;
- }
-
- /**
- * 向指定的url发送一次get请求,参数是字符串
- *
- * @param baseUrl 请求地址
- * @param urlParams 请求参数,格式是String
- * @return 返回结果, 请求失败时返回null
- * @apiNote http接口处用 @RequestParam接收参数
- */
- public static String httpSyncGet(String baseUrl, String urlParams) {
-
- CloseableHttpClient httpClient = HttpClientFactory.getInstance().getHttpSyncClientPool().getHttpClient();
- HttpGet httpGet = new HttpGet(baseUrl);
- //Parameters
- LOG.warn("==== Parameters ======" + urlParams);
- CloseableHttpResponse response = null;
- try {
-
- if (null != urlParams || "".equals(urlParams)) {
-
- httpGet.setURI(new URI(httpGet.getURI().toString()
- + "?" + urlParams));
- } else {
- httpGet.setURI(new URI(httpGet.getURI().toString()));
- }
-
- response = httpClient.execute(httpGet);
- LOG.warn("========HttpResponseProxy:========" + response.getStatusLine());
- HttpEntity entity = response.getEntity();
- String result = null;
- if (entity != null) {
- result = EntityUtils.toString(entity, "UTF-8");
- LOG.warn("========Response=======" + result);
- }
- EntityUtils.consume(entity);
- return result;
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (response != null) {
- try {
- response.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return null;
- }
-
-
- /**
- * 向指定的url发送一次get请求,参数是字符串
- *
- * @param baseUrl 请求地址
- * @return 返回结果, 请求失败时返回null
- * @apiNote http接口处用 @RequestParam接收参数
- */
- public static String httpSyncGet(String baseUrl) {
-
- CloseableHttpClient httpClient = HttpClientFactory.getInstance().getHttpSyncClientPool().getHttpClient();
- HttpGet httpGet = new HttpGet(baseUrl);
-
- CloseableHttpResponse response = null;
- try {
- httpGet.setURI(new URI(httpGet.getURI().toString()));
- response = httpClient.execute(httpGet);
- LOG.warn("========HttpResponseProxy:========" + response.getStatusLine());
- HttpEntity entity = response.getEntity();
- String result = null;
- if (entity != null) {
- result = EntityUtils.toString(entity, "UTF-8");
- LOG.warn("========Response=======" + result);
- }
- EntityUtils.consume(entity);
- return result;
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (response != null) {
- try {
- response.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return null;
- }
-
-
- /**
- * 向指定的url发送一次异步post请求,参数是字符串
- *
- * @param baseUrl 请求地址
- * @param postString 请求参数,格式是json.toString()
- * @param urlParams 请求参数,格式是String
- * @param callback 回调方法,格式是FutureCallback
- * @return 返回结果, 请求失败时返回null
- * @apiNote http接口处用 @RequestParam接收参数
- */
- public static void httpAsyncPost(String baseUrl, String postString,
- String urlParams, FutureCallback callback) throws Exception {
- if (baseUrl == null || "".equals(baseUrl)) {
- LOG.warn("we don't have base url, check config");
- throw new Exception("missing base url");
- }
- CloseableHttpAsyncClient hc = HttpClientFactory.getInstance().getHttpAsyncClientPool()
- .getAsyncHttpClient();
- try {
- hc.start();
- HttpPost httpPost = new HttpPost(baseUrl);
-
-// httpPost.setHeader("Connection","close");
-
- if (null != postString) {
- LOG.debug("exeAsyncReq post postBody={" + postString + "}");
- StringEntity entity = new StringEntity(postString.toString(), utf8Charset);
- entity.setContentEncoding("UTF-8");
- entity.setContentType("application/json");
- httpPost.setEntity(entity);
- }
-
- if (null != urlParams) {
-
- httpPost.setURI(new URI(httpPost.getURI().toString()
- + "?" + urlParams));
- }
-
- LOG.warn("exeAsyncReq getparams:" + httpPost.getURI());
-
- hc.execute(httpPost, callback);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- /**
- * 向指定的url发送一次异步post请求,参数是字符串
- *
- * @param baseUrl 请求地址
- * @param urlParams 请求参数,格式是List<BasicNameValuePair>
- * @param callback 回调方法,格式是FutureCallback
- * @return 返回结果, 请求失败时返回null
- * @apiNote http接口处用 @RequestParam接收参数
- */
- public static void httpAsyncPost(String baseUrl, List<BasicNameValuePair> postBody,
- List<BasicNameValuePair> urlParams, FutureCallback callback) throws Exception {
- if (baseUrl == null || "".equals(baseUrl)) {
- LOG.warn("we don't have base url, check config");
- throw new Exception("missing base url");
- }
-
- try {
- CloseableHttpAsyncClient hc = HttpClientFactory.getInstance().getHttpAsyncClientPool()
- .getAsyncHttpClient();
-
- hc.start();
-
- HttpPost httpPost = new HttpPost(baseUrl);
-
-// httpPost.setHeader("Connection","close");
-
- if (null != postBody) {
- LOG.debug("exeAsyncReq post postBody={" + postBody + "}");
- UrlEncodedFormEntity entity = new UrlEncodedFormEntity(
- postBody, "UTF-8");
- httpPost.setEntity(entity);
- }
-
- if (null != urlParams) {
-
- String getUrl = EntityUtils
- .toString(new UrlEncodedFormEntity(urlParams));
-
- httpPost.setURI(new URI(httpPost.getURI().toString()
- + "?" + getUrl));
- }
-
- LOG.warn("exeAsyncReq getparams:" + httpPost.getURI());
- hc.execute(httpPost, callback);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
+ public static BlockingQueue<String> getRetryQueue() {
+ return retryQueue;
}
/**
@@ -409,7 +84,7 @@ public class HttpClientUtil {
*/
public static void httpAsyncPostFileToZx(String baseUrl, byte[] fileIs, AsyncHttpClientPostFileCallback callback) throws Exception {
if (baseUrl == null || "".equals(baseUrl)) {
- LOG.warn("we don't have base url, check config");
+ logger.warn("we don't have base url, check config");
throw new Exception("missing base url");
}
@@ -433,7 +108,7 @@ public class HttpClientUtil {
if (null != fileIs) {
httpPut.setEntity(new ByteArrayEntity(fileIs));
}
- LOG.info("File httpPut.getURI()===>>>" + httpPut.getURI());
+ logger.info("File httpPut.getURI()===>>>" + httpPut.getURI());
localContext.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
@@ -454,7 +129,7 @@ public class HttpClientUtil {
*/
public static void httpAsyncPostFileToZx(String baseUrl, byte[] fileIs, AsyncPostMailFilesCallback callback) throws Exception {
if (baseUrl == null || "".equals(baseUrl)) {
- LOG.warn("we don't have base url, check config");
+ logger.warn("we don't have base url, check config");
throw new Exception("missing base url");
}
@@ -478,7 +153,7 @@ public class HttpClientUtil {
if (null != fileIs) {
httpPut.setEntity(new ByteArrayEntity(fileIs));
}
- LOG.info("File httpPut.getURI()===>>>" + httpPut.getURI());
+ logger.info("File httpPut.getURI()===>>>" + httpPut.getURI());
localContext.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
@@ -503,7 +178,7 @@ public class HttpClientUtil {
public static void httpAsyncGetFile(String baseUrl, FutureCallback callback) throws Exception {
if (baseUrl == null || "".equals(baseUrl)) {
- LOG.warn("we don't have base url, check config");
+ logger.warn("we don't have base url, check config");
throw new Exception("missing base url");
}
CloseableHttpAsyncClient hc = HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient();
@@ -523,7 +198,7 @@ public class HttpClientUtil {
// }
// LOG.info("exeAsyncReq getparams:" + httpGet.getURI());
- LOG.info("File httpGet.getURI()===>>>" + httpGet.getURI());
+ logger.info("File httpGet.getURI()===>>>" + httpGet.getURI());
hc.execute(httpGet, callback);
} catch (Exception e) {
e.printStackTrace();
@@ -545,7 +220,7 @@ public class HttpClientUtil {
public static void httpAsyncGet(String baseUrl, String urlParams, FutureCallback callback) throws Exception {
if (baseUrl == null || "".equals(baseUrl)) {
- LOG.warn("we don't have base url, check config");
+ logger.warn("we don't have base url, check config");
throw new Exception("missing base url");
}
CloseableHttpAsyncClient hc = HttpClientFactory.getInstance().getHttpAsyncClientPool()
@@ -567,7 +242,7 @@ public class HttpClientUtil {
httpGet.setURI(new URI(httpGet.getURI().toString()));
}
- LOG.warn("exeAsyncReq getparams:" + httpGet.getURI());
+ logger.warn("exeAsyncReq getparams:" + httpGet.getURI());
hc.execute(httpGet, callback);
@@ -590,7 +265,7 @@ public class HttpClientUtil {
*/
public static void httpAsyncGet(String baseUrl, List<BasicNameValuePair> urlParams, FutureCallback callback) throws Exception {
if (baseUrl == null || "".equals(baseUrl)) {
- LOG.warn("we don't have base url, check config");
+ logger.warn("we don't have base url, check config");
throw new Exception("missing base url");
}
@@ -613,7 +288,7 @@ public class HttpClientUtil {
+ "?" + getUrl));
}
- LOG.warn("exeAsyncReq getparams:" + httpGet.getURI());
+ logger.warn("exeAsyncReq getparams:" + httpGet.getURI());
hc.execute(httpGet, callback);
@@ -624,126 +299,6 @@ public class HttpClientUtil {
}
- public static String OkSyncPost(String url, String json) throws IOException {
-
- OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient();
-
- RequestBody body = RequestBody.create(JSON, json);
- Request request = new Request.Builder()
- .url(url)
- .post(body)
- .build();
- try (Response response = okClient.newCall(request).execute()) {
-
- return response.body().string();
- }
- }
-
- public static void OkAsyncPost(String url, String json) throws IOException {
- OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient();
-
- RequestBody body = RequestBody.create(JSON, json);
- Request request = new Request.Builder()
- .url(url)
- .post(body)
- .build();
- Call call = okClient.newCall(request);
- call.enqueue(new Callback() {
- @Override
- public void onFailure(Call call, IOException e) {
- e.printStackTrace();
- }
-
- @Override
- public void onResponse(Call call, Response response) throws IOException {
-
- LOG.warn("OkAsyncPost回调:" + response.body().string());
- }
- });
-
- }
-
-
- public static void OkAsyncPost(String url, Map<String, String> map) throws IOException {
- OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient();
-
- FormBody.Builder formBodyBuilder = new FormBody.Builder();
- for (Map.Entry<String, String> entry : map.entrySet()) {
- formBodyBuilder.add(entry.getKey(), entry.getValue());
- }
- Request request = new Request.Builder()
- .url(url)
- .post(formBodyBuilder.build())
- .build();
- Call call = okClient.newCall(request);
- call.enqueue(new Callback() {
- @Override
- public void onFailure(Call call, IOException e) {
- e.printStackTrace();
- }
-
- @Override
- public void onResponse(Call call, Response response) throws IOException {
-
- LOG.warn("OkAsyncPost回调:" + response.body().string());
- }
- });
-
- }
-
- public static void OkAsyncPost(String url, Map<String, String> map, Callback callback) throws IOException {
- OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient();
-
- FormBody.Builder formBodyBuilder = new FormBody.Builder();
- for (Map.Entry<String, String> entry : map.entrySet()) {
- formBodyBuilder.add(entry.getKey(), entry.getValue());
- }
-
- Request request = new Request.Builder()
- .url(url)
- .post(formBodyBuilder.build())
- .build();
- Call call = okClient.newCall(request);
- call.enqueue(callback);
-
- }
-
- public static String OkSyncGet(String url) throws IOException {
-
- OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient();
-
- Request request = new Request.Builder()
- .url(url)
- .build();
- try (Response response = okClient.newCall(request).execute()) {
-
- return response.body().string();
- }
- }
-
- public static void OkAsyncGet(String url) throws IOException {
-
- OkHttpClient okClient = HttpClientFactory.getInstance().getOkClientPool().getHttpClient();
-
- Request request = new Request.Builder()
- .url(url)
- .build();
- Call call = okClient.newCall(request);
- call.enqueue(new Callback() {
- @Override
- public void onFailure(Call call, IOException e) {
- e.printStackTrace();
- }
-
- @Override
- public void onResponse(Call call, Response response) throws IOException {
-
- LOG.warn("OkAsyncGet回调:" + response.body().string());
- }
- });
- }
-
-
/**
* 获取单个文件并传递单个文件时使用
*
@@ -759,15 +314,17 @@ public class HttpClientUtil {
httpAsyncGetFile(getFileUrl, callback);
// LOG.info(Thread.currentThread().getName() + "===>run====>>>" + "success,now time is" + System.currentTimeMillis());
} catch (Exception e) {
- LOG.error("getFileAndPostFile multithreading is error===>" + e);
+ logger.error("getFileAndPostFile multithreading is error===>" + e);
}
}
});
} catch (Exception e) {
- LOG.error("getFileAndPostFile method is error===>" + e);
+ logger.error("getFileAndPostFile method is error===>" + e);
}
}
+
+
/**
* 对消息数据进行批量组装
* @param contents
@@ -815,90 +372,66 @@ public class HttpClientUtil {
}
- public static void asyncProducerAvroMessageToZX(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) throws IOException {
+ /**
+ * 用于测试总线的同步发送客户端接口,不作为生产环境使用
+ * @param urlProducer
+ * @param topic
+ * @param results
+ * @param dataJson
+ * @param userAgent
+ * @param msgSessionCookie
+ * @param count
+ * @param postTime
+ * @throws IOException
+ */
+ public static void syncProducerAvroMessageTest(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) throws IOException {
HttpClientTest.producerAvroMessageToZX(urlProducer, topic, results, dataJson, userAgent, msgSessionCookie, count, postTime);
}
- public static void asyncProducerAvroMessageToZX_toBatch(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {
- HttpPost httpPost = null;
- urlProducer = urlProducer.trim();
- CloseableHttpAsyncClient httpClient = null;
- try {
- httpClient = HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient();
- httpClient.start();
- httpPost = new HttpPost(urlProducer);
- httpPost.addHeader("User-Agent", userAgent);
- httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
- httpPost.addHeader("Cookie", msgSessionCookie);
- String md5Avro = MD5Utils.md5Encode(results);
- httpPost.addHeader("Checksum", md5Avro);
- logger.info("批量发送body Checksum MD5 为:" + md5Avro);
- httpPost.addHeader("Content-Type", "binary/octet-stream");
- httpPost.addHeader("X-Tag", "getXTAG(dataJson, topic)");
- ByteArrayEntity payload = new ByteArrayEntity(results);
- payload.setContentEncoding("utf-8");
- httpPost.setEntity(payload);
- AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime);
- httpClient.execute(httpPost, asyncPostMsgCallBack);
- logger.info("当前Thread number ID :" + Thread.currentThread().getId());
- } catch (MalformedURLException e) {
- //执行URL url = new URL()的异常
- e.printStackTrace();
- } catch (ClientProtocolException e) {
- // 执行httpClient.execute(httpGet)的异常
- e.printStackTrace();
- } catch (IOException e) {
- // 执行httpClient.execute(httpGet)的异常
- logger.error("asyncProducerAvroToZX is IOException===>>>" + e.getMessage() + "<<<===IOException Message is==>" + dataJson + "<==");//进一步细化日志
- e.printStackTrace();
- } catch (Exception e) {
- //handle response here... try other servers
- logger.error("asyncProducerAvroToZX is Exception===>>>" + e.getMessage() + "<<<===Exception Message is==>" + dataJson + "<==");//进一步细化日志
- e.printStackTrace();
- }
-
- }
-
-
/**
- * 批量生产消息-总线
+ * 异步批量发送接口
+ * @param batchProduceUrl
+ * @param topic
+ * @param results
+ * @param xTag
+ * @param userAgent
+ * @param msgSessionCookie
+ * @param count
+ * @param postTime
*/
- public static void asyncProducerAvroMessageToZX_bk(String urlProducer,String topic, byte[] results, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {
+ public static void batchAsyncProduceMessage(String batchProduceUrl, String topic,
+ byte[] results,
+ String xTag, String userAgent,
+ String msgSessionCookie, int count, long postTime) {
HttpPost httpPost = null;
- urlProducer = urlProducer.trim();
+ batchProduceUrl = batchProduceUrl.trim();
CloseableHttpAsyncClient httpClient = null;
try {
httpClient = HttpClientFactory.getInstance().getHttpAsyncClientPool().getAsyncHttpClient();
httpClient.start();
- httpPost = new HttpPost(urlProducer);
+ httpPost = new HttpPost(batchProduceUrl);
httpPost.addHeader("User-Agent", userAgent);
httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
httpPost.addHeader("Cookie", msgSessionCookie);
- String md5Avro = MD5Utils.md5Encode(results);
- httpPost.addHeader("Checksum", md5Avro);
- logger.info("批量发送body Checksum MD5 为:" + md5Avro);
+ String checksum = MD5Utils.md5Encode(results);
+ httpPost.addHeader("Checksum", checksum);
httpPost.addHeader("Content-Type", "binary/octet-stream");
- httpPost.addHeader("X-Tag", getXTAG(dataJson, topic));
+ httpPost.addHeader("X-Tag", xTag);
ByteArrayEntity payload = new ByteArrayEntity(results);
payload.setContentEncoding("utf-8");
httpPost.setEntity(payload);
- AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime);
- httpClient.execute(httpPost, asyncPostMsgCallBack);
- logger.info("当前Thread number ID :" + Thread.currentThread().getId());
+ AsyncBatchMsgCallBack asyncBatchMsgCallBack = new AsyncBatchMsgCallBack(batchProduceUrl, topic, checksum, count, postTime);
+ httpClient.execute(httpPost, asyncBatchMsgCallBack);
} catch (MalformedURLException e) {
- //执行URL url = new URL()的异常
e.printStackTrace();
} catch (ClientProtocolException e) {
- // 执行httpClient.execute(httpGet)的异常
e.printStackTrace();
} catch (IOException e) {
- // 执行httpClient.execute(httpGet)的异常
- logger.error("asyncProducerAvroToZX is IOException===>>>" + e.getMessage() + "<<<===IOException Message is==>" + dataJson + "<==");//进一步细化日志
+ logger.error("IOException===>>>" + e.getMessage() + "<<<===IOException Message is==>" + batchProduceUrl + "<==");
e.printStackTrace();
} catch (Exception e) {
- //handle response here... try other servers
- logger.error("asyncProducerAvroToZX is Exception===>>>" + e.getMessage() + "<<<===Exception Message is==>" + dataJson + "<==");//进一步细化日志
+ logger.error("Other Exception===>>>" + e.getMessage() + "<<<===Exception Message is==>" + batchProduceUrl + "<==");
e.printStackTrace();
}
@@ -909,16 +442,18 @@ public class HttpClientUtil {
* 生产AVRO数据入ZX(单条)--数据不包含schema
* 静态,适用于异步与多线程的版本
*
- * @param urlProducer
+ * @param singleProduceUrl
* @param topic
* @param dataJson
* @param userAgent
* @param msgSessionCookie
* @return
*/
- public static void asyncProducerAvroToZX(String urlProducer, String topic, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {
+ public static void singleAsyncProduceMessage(String singleProduceUrl, String topic,
+ String dataJson, String userAgent,
+ String msgSessionCookie, int count, long postTime) {
HttpPost httpPost = null;
- urlProducer = urlProducer.trim();
+ singleProduceUrl = singleProduceUrl.trim();
byte[] resultArray = null;
CloseableHttpAsyncClient httpClient = null;
try {
@@ -961,7 +496,7 @@ public class HttpClientUtil {
resultArray = alreadyGetFileTagRecordSoOnlyGetMergeAllArray(topic, resultArray);
}
- httpPost = new HttpPost(urlProducer);
+ httpPost = new HttpPost(singleProduceUrl);
// set header
httpPost.addHeader("User-Agent", userAgent);
@@ -974,16 +509,11 @@ public class HttpClientUtil {
httpPost.addHeader("Cookie", msgSessionCookie);//不设置Cookie时,广东测试出现报错,打开广东的Cookie设置测试一下,经测试,可用
- try {
- String md5Avro = MD5Utils.md5Encode(resultArray);
- httpPost.addHeader("Checksum", md5Avro);
- logger.info("请求端Checksum MD5 avro 加密为:" + md5Avro);
- logger.debug("原始数据==>" + dataJson + "<==," +
- "原始数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," +
- "对应请求端Checksum MD5 avro 加密为:" + md5Avro);//20200521新增,自证清白
- } catch (Exception e) {
- logger.error("MD5Utils.md5Encode Method is error===>>> " + e);
- }
+
+ String md5Avro = MD5Utils.md5Encode(resultArray);
+ httpPost.addHeader("Checksum", md5Avro);
+
+
// httpPost.addHeader("Content-Type", "application/avro+json;charset=UTF-8");
httpPost.addHeader("Content-Type", "binary/octet-stream");
@@ -999,22 +529,16 @@ public class HttpClientUtil {
ByteArrayEntity payload = new ByteArrayEntity(resultArray);
payload.setContentEncoding("utf-8");
//payload.setContentType("text/xml; charset=UTF-8");
- // anti avro
httpPost.setEntity(payload);
-
-
- logger.info("最终加载内容字节数组长度: " + resultArray.length);
-// logger.debug("封装数据==>" + dataJson + "<==最终加载内容字节数组长度: " + resultArray.length);//20200428进一步细化日志
- logger.debug("原始数据==>" + dataJson + "<==," +
+// logger.debug("封装数据==>" + dataJson + "<==最终加载内容字节数组长度: " + resultArray.length);//20200428进一步细化日志
+ logger.info("原始数据==>" + dataJson + "<==," +
"原始数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," +
"数据处理时间handleTime==>" + (System.currentTimeMillis() - postTime) + "<==," +
"最终加载内容字节数组长度: " + resultArray.length + "," +
- "最终加载内容字节数组:" + Arrays.toString(resultArray));//20200521进一步细化日志
-
+ "对应请求端Checksum MD5 avro 加密为:" + md5Avro);//20200521进一步细化日志
//执行请求
- AsyncPostMsgCallBack asyncPostMsgCallBack = new AsyncPostMsgCallBack(urlProducer, topic, dataJson, userAgent, msgSessionCookie, count, postTime);
+ AsyncSingleMsgCallBack asyncPostMsgCallBack = new AsyncSingleMsgCallBack(singleProduceUrl, topic, dataJson, userAgent, msgSessionCookie, count, postTime);
httpClient.execute(httpPost, asyncPostMsgCallBack);
-
} catch (MalformedURLException e) {
//执行URL url = new URL()的异常
e.printStackTrace();
@@ -1023,11 +547,13 @@ public class HttpClientUtil {
e.printStackTrace();
} catch (IOException e) {
// 执行httpClient.execute(httpGet)的异常
- logger.error("asyncProducerAvroToZX is IOException===>>>" + e.getMessage() + "<<<===IOException Message is==>" + dataJson + "<==");//进一步细化日志
+ logger.error("asyncProducerAvroToZX is IOException===>>>" + e.getMessage()
+ + "<<<===IOException Message is==>" + dataJson + "<==");//进一步细化日志
e.printStackTrace();
} catch (Exception e) {
//handle response here... try other servers
- logger.error("asyncProducerAvroToZX is Exception===>>>" + e.getMessage() + "<<<===Exception Message is==>" + dataJson + "<==");//进一步细化日志
+ logger.error("asyncProducerAvroToZX is Exception===>>>"
+ + e.getMessage() + "<<<===Exception Message is==>" + dataJson + "<==");//进一步细化日志
e.printStackTrace();
}
}
@@ -1049,8 +575,7 @@ public class HttpClientUtil {
}
}
- private static HashMap<String, Schema> schemaHashMap = new HashMap<String, Schema>();//用于存放Schema
- private static Logger logger = Logger.getLogger(HttpClientUtil.class);
+
/**
* 获取数据中的日志标签并将所有相关数据字节数组化后拼接返回
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncGetMailFilesCallback.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncGetMailFilesCallback.java
index 9414163..6e04098 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncGetMailFilesCallback.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncGetMailFilesCallback.java
@@ -52,7 +52,7 @@ public class AsyncGetMailFilesCallback implements FutureCallback<HttpResponse> {
public AsyncGetMailFilesCallback(ConfigInfo configInfo, String getFileUrl, String sendMsg, AsyncPostMailFilesCallback asyncPostMailFilesCallback, int count, int urlCount, int dealUrlCount, LinkedList<String> attachmentsUrl) {
this.configInfo = configInfo;
this.postFileUrl = configInfo.getPostFileUrl();//通过configInfo赋值
- this.postMsgUrl = configInfo.getPostMsgUrl();//通过configInfo赋值
+ this.postMsgUrl = configInfo.getSingleProduceUrl();//通过configInfo赋值
this.getFileUrl = getFileUrl;//初次存储的是eml_file_url,后续每次存储的都是当次需要请求的文件路径
this.sendMsg = sendMsg;
this.asyncPostMailFilesCallback = asyncPostMailFilesCallback;
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncPostMailFilesCallback.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncPostMailFilesCallback.java
index ccf0729..1148ce1 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncPostMailFilesCallback.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/mail/AsyncPostMailFilesCallback.java
@@ -65,7 +65,7 @@ public class AsyncPostMailFilesCallback implements FutureCallback<HttpResponse>
public AsyncPostMailFilesCallback(ConfigInfo configInfo, String sendMsg, int count, int isCount, int dealIsCount, LinkedList<String> attachmentsIdList) {
this.configInfo = configInfo;
this.postFileUrl = configInfo.getPostFileUrl();
- this.postMsgUrl = configInfo.getPostMsgUrl();
+ this.postMsgUrl = configInfo.getSingleProduceUrl();
this.sendMsg = sendMsg;//存放对应于url的数据
this.count = count;//初始为0
this.isCount = isCount;//初始总数为url总数
@@ -275,7 +275,7 @@ public class AsyncPostMailFilesCallback implements FutureCallback<HttpResponse>
AvroMonitorTimerTask.msgReadyPostSum++;//多个文件对应一条消息
//开始推送消息进入总线
- ProResBody proResBody = dclAsyncPost.avroDataLoad(configInfo.getPostMsgUrl(), configInfo.getTopicName(), sendMsg, configInfo.getBatchSize(), configInfo.getUserAgent(), configInfo.getMsgSessionCookie());
+ ProResBody proResBody = dclAsyncPost.avroDataLoad(configInfo.getSingleProduceUrl(), configInfo.getTopicName(), sendMsg, configInfo.getBatchSize(), configInfo.getUserAgent(), configInfo.getMsgSessionCookie());
LOG.info("Send message with many fileId to zx over,this responseBody is===>" + proResBody.toString());
} else if (dealIsCount < isCount) {//继续发送余下文件流获取id
// 注意此处dealIsCount必然是>=1的,所以不需要考虑eml_file,因为eml_file必然已经完成,直接发送attachmentsResultIsList中的流获取id
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java
new file mode 100644
index 0000000..43c8eab
--- /dev/null
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncBatchMsgCallBack.java
@@ -0,0 +1,206 @@
+package cn.ac.iie.cusflume.sink.HttpAsyncUtils.msgCallBack;
+
+import cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientUtil;
+import cn.ac.iie.cusflume.sink.YbHttpAvroSinkFile;
+import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask;
+import cn.ac.iie.cusflume.sink.bean.redirectBean.ResRedirBody;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.utils.HttpClientUtils;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.util.EntityUtils;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * 被回调的对象,给异步的httpclient使用
+ */
+public class AsyncBatchMsgCallBack implements FutureCallback<HttpResponse> {
+ private static Logger logger = Logger.getLogger(AsyncBatchMsgCallBack.class);
+
+ private String batchProduceUrl;
+ private String topicName;
+ private String checksum;
+ private int count;
+ private long postTime;
+
+ public AsyncBatchMsgCallBack(String batchProduceUrl, String topicName,
+ String checksum, int count, long postTime) {
+ this.batchProduceUrl = batchProduceUrl;
+ this.topicName = topicName;
+ this.count = count;
+ this.postTime = postTime;
+ this.checksum = checksum;
+ }
+
+
+ public String getBatchProduceUrl() {
+ return batchProduceUrl;
+ }
+
+ public void setBatchProduceUrl(String batchProduceUrl) {
+ this.batchProduceUrl = batchProduceUrl;
+ }
+
+ public String getChecksum() {
+ return checksum;
+ }
+
+ public void setChecksum(String checksum) {
+ this.checksum = checksum;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public void setCount(int count) {
+ this.count = count;
+ }
+
+ public long getPostTime() {
+ return postTime;
+ }
+
+ public void setPostTime(long postTime) {
+ this.postTime = postTime;
+ }
+
+ /**
+ * 请求完成后调用该函数
+ */
+ @Override
+ public void completed(HttpResponse response) {
+ try {
+ int statusCode = response.getStatusLine().getStatusCode();
+
+ HttpEntity entity = response.getEntity();
+
+ String result = null;
+ if (entity != null) {
+ result = EntityUtils.toString(entity, "UTF-8");
+ }
+
+ /**
+ * 不直接进行对象转换,除非数据加载不成功
+ */
+ Map map = JSONObject.parseObject(result, Map.class);
+
+ int resRedirBodyCode = 0;
+
+ if (map != null) {
+ resRedirBodyCode = (int) map.get("code");
+ }
+
+ /**
+ * 20200818-接口细化响应码
+ */
+ if (statusCode == 200 && resRedirBodyCode == 200) {
+ logger.info(
+ "数据加载成功,返回码: " + statusCode +
+ "生产数据checksum==>" + checksum + "<==," +
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==," +
+ "数据加载成功,返回码: " + statusCode);
+ AvroMonitorTimerTask.addSuccessNum(count);
+ } else {
+ AvroMonitorTimerTask.addFailedNum(count);
+ switch (resRedirBodyCode) {
+ case 300:
+ logger.error("AsyncPostBatchMsgCallBack==>重定向响应体-redirect-ret==>" + result
+ + "总线批量发送URL==>" + batchProduceUrl
+ + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
+ + "<==,Status Code:" + statusCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求.");
+
+ //若不包含对应字段,则不进行对象转换,减少报错
+ if (result.contains("redirect")) {
+ ResRedirBody resRedirBody = JSONObject.parseObject(result, ResRedirBody.class);
+ String redirectUrl = resRedirBody.getData().getRedirect();
+ if (StringUtils.isNotBlank(redirectUrl)) {
+ YbHttpAvroSinkFile.changeUrl(redirectUrl);
+ }
+ } else {
+ logger.error("AsyncPostBatchMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!");
+ }
+
+ break;
+ case 301:
+ logger.error("AsyncPostBatchMsgCallBack==>Status Code:" + statusCode
+ + "总线批量发送URL==>" + batchProduceUrl
+ + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
+ + ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待.");
+ break;
+ case 410:
+ logger.error("AsyncPostBatchMsgCallBack==>Status Code:" + statusCode
+ + "总线批量发送URL==>" + batchProduceUrl
+ + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
+ + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie.");
+ YbHttpAvroSinkFile.updateCookie();
+ break;
+ case 500:
+ logger.error("AsyncPostBatchMsgCallBack==>Status Code:" + statusCode
+ + "总线批量发送URL==>" + batchProduceUrl
+ + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
+ + ",resRedirBodyCode:500,处理请求过程出现系统错误.");
+ YbHttpAvroSinkFile.updateCookie();
+ break;
+ default:
+ logger.error("AsyncPostBatchMsgCallBack==>数据加载失败,响应体:" + result
+ + "总线批量发送URL==>" + batchProduceUrl
+ + "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
+ + "---Status Code:" + statusCode + "---resRedirBodyCode:" + resRedirBodyCode);
+ break;
+ }
+ }
+
+ if (entity != null) {
+ try {
+ EntityUtils.consume(entity);
+ } catch (final IOException ex) {
+ logger.error("IOException : " + ex);
+ }
+ }
+
+ } catch (Exception e) {
+ logger.error("AsyncPostBatchMsgCallBack Get response from ZX is error===>>>"
+ + e.getMessage());
+ e.printStackTrace();
+ } finally {
+ HttpClientUtils.closeQuietly(response);
+ }
+ }
+
+ /**
+ * 请求取消后调用该函数
+ */
+ @Override
+ public void cancelled() {
+ logger.error("AsyncPostMagCallBack Request is cancelled");
+ }
+
+ /**
+ * 请求失败后调用该函数
+ */
+ @Override
+ public void failed(Exception e) {
+ logger.error("AsyncPostMagCallBack Request is Failed,This Failed data checksum is ==>"+ checksum);
+ }
+
+}
+
+
+
+
+
+
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncSingleMsgCallBack.java
index 810e668..2d7eb33 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncPostMsgCallBack.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/HttpAsyncUtils/msgCallBack/AsyncSingleMsgCallBack.java
@@ -15,14 +15,13 @@ import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Map;
/**
* 被回调的对象,给异步的httpclient使用
*/
-public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
- private static Logger logger = Logger.getLogger(AsyncPostMsgCallBack.class);
+public class AsyncSingleMsgCallBack implements FutureCallback<HttpResponse> {
+ private static Logger logger = Logger.getLogger(AsyncSingleMsgCallBack.class);
private String postMsgUrl;
private String topicName;
@@ -32,7 +31,7 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
private int count;
private long postTime;
- public AsyncPostMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {
+ public AsyncSingleMsgCallBack(String postMsgUrl, String topicName, String dataJson, String userAgent, String msgSessionCookie, int count, long postTime) {
this.postMsgUrl = postMsgUrl;
this.topicName = topicName;
this.dataJson = dataJson;
@@ -103,7 +102,6 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
*/
@Override
public void completed(HttpResponse response) {
-// ProResBody proResBody = null;
try {
int statusCode = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
@@ -121,11 +119,7 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
if (map != null) {
resRedirBodyCode = (int) map.get("code");
}
-// int resRedirBodyCode = resRedirBody.getCode();
- /* logger.debug("生产数据==>" + dataJson + "<==," +
- "生产数据checksum==>" + MD5Utils.md5Encode(dataJson) + "<==," +
- "返回的生产原始响应体String数据为:" + ret);*/
/**
* 20200818-接口细化响应码
*/
@@ -137,12 +131,11 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
"数据加载成功,返回码: " + statusCode);
AvroMonitorTimerTask.addSuccessNum(count);
-
} else {
AvroMonitorTimerTask.addFailedNum(count);
switch (resRedirBodyCode) {
case 300:
- logger.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>" + ret
+ logger.error("AsyncSingleMsgCallBack==>重定向响应体-redirect-ret==>" + ret
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
+ "<==,Status Code:" + statusCode + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求.");
//若不包含对应字段,则不进行对象转换,减少报错
@@ -153,33 +146,35 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
YbHttpAvroSinkFile.changeUrl(redirectUrl);
}
} else {
- logger.error("AsyncPostMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!");
+ logger.error("AsyncSingleMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!");
}
- YbHttpAvroSinkFile.redirectContents.add(dataJson);
+ HttpClientUtil.getRetryQueue().put(dataJson);
break;
case 301:
- logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode
+ logger.error("AsyncSingleMsgCallBack==>Status Code:" + statusCode
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
+ ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待.");
- YbHttpAvroSinkFile.redirectContents.add(dataJson);
+ HttpClientUtil.getRetryQueue().put(dataJson);
break;
case 410:
- logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode
+ logger.error("AsyncSingleMsgCallBack==>Status Code:" + statusCode
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
+ ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie.");
YbHttpAvroSinkFile.updateCookie();
break;
case 500:
- logger.info("AsyncPostMsgCallBack==>Status Code:" + statusCode
+ logger.error("AsyncSingleMsgCallBack==>Status Code:" + statusCode
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
+ ",resRedirBodyCode:500,处理请求过程出现系统错误.");
YbHttpAvroSinkFile.updateCookie();
break;
default:
- logger.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + ret
+
+ logger.error("AsyncSingleMsgCallBack==>数据加载失败,响应体:" + ret
+ "服务端响应时间(ms)==>" + (System.currentTimeMillis() - postTime) + "<==,"
+ "---Status Code:" + statusCode + "---resRedirBodyCode:" + resRedirBodyCode);
+
break;
}
}
@@ -188,11 +183,12 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
try {
EntityUtils.consume(entity);
} catch (final IOException ex) {
+ logger.error("IOException : " + ex);
}
}
} catch (Exception e) {
- logger.error("AsyncPostMsgCallBack Get response from ZX is error===>>>" + e.getMessage() + "<<<===Message is==>" + dataJson + "<==");//细化日志
+ logger.error("AsyncSingleMsgCallBack Get response from ZX is error===>>>" + e.getMessage() + "<<<===Message is==>" + dataJson + "<==");//细化日志
e.printStackTrace();
} finally {
HttpClientUtils.closeQuietly(response);
@@ -204,7 +200,7 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
*/
@Override
public void cancelled() {
- logger.error("AsyncPostMagCallBack Request is cancelled");
+ logger.error("AsyncSingleMsgCallBack Request is cancelled");
}
/**
@@ -212,15 +208,8 @@ public class AsyncPostMsgCallBack implements FutureCallback<HttpResponse> {
*/
@Override
public void failed(Exception e) {
- count++;
- logger.info("AsyncPostMagCallBack Request is Failed,This Failed data is ==>" + dataJson + "<==,Retry count=" + count);
- if (count > 1) {
-
- AvroMonitorTimerTask.addFailedNum(1);
- logger.error("dataJson:" + dataJson + " send failed finally,error:" + e.toString());
- } else {
- HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, dataJson, userAgent, msgSessionCookie, count, System.currentTimeMillis());//failed失败时重试
- }
+ logger.error("AsyncSingleMsgCallBack Request is Failed,This Failed data is ==>"
+ + dataJson );
}
}
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/SinkService.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/SinkService.java
deleted file mode 100644
index e550f3f..0000000
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/SinkService.java
+++ /dev/null
@@ -1,139 +0,0 @@
-package cn.ac.iie.cusflume.sink;
-
-import cn.ac.iie.cusflume.sink.CommonUtils.SinkHttpClientUtil;
-import cn.ac.iie.cusflume.sink.avroUtils.AvroMonitorTimerTask;
-import cn.ac.iie.cusflume.sink.avroUtils.MD5Utils;
-import cn.ac.iie.cusflume.sink.bean.redirectBean.ResRedirBody;
-import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.lang.StringUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpStatus;
-import org.apache.http.ParseException;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.protocol.HTTP;
-import org.apache.http.util.EntityUtils;
-import org.apache.log4j.Logger;
-import org.mortbay.log.Log;
-
-import java.io.IOException;
-
-public class SinkService {
- private static final SinkService sinkService = new SinkService();
-
- private SinkService() {
- }
- //SinkHttpClientUtil.getHttpClient();
- public static CloseableHttpClient httpClient = null;
- private static Logger LOG = Logger.getLogger(SinkHttpClientUtil.class);
-
- public static SinkService getInstance() {
- return sinkService;
- }
-
- public void producerAvroMessageToBus(
- String urlProducer, String topic, byte[] results,
- String dataJson, String userAgent, String msgSessionCookie, int batchSize) {
-
- httpClient = SinkHttpClientUtil.getHttpClient();
- HttpPost httpPost = null;
- urlProducer = urlProducer.trim();
- String msg = "-1";
- // 创建POST请求对象
- CloseableHttpResponse response = null;
- try {
- httpPost = new HttpPost(urlProducer);
- httpPost.addHeader("User-Agent", userAgent);
- httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
- httpPost.addHeader("Cookie", msgSessionCookie);
- String md5Avro = MD5Utils.md5Encode(results);
- httpPost.addHeader("Checksum", md5Avro);
- LOG.info("批量发送body Checksum MD5 为:" + md5Avro);
- httpPost.addHeader("Content-Type", "binary/octet-stream");
- httpPost.addHeader("X-Tag", cn.ac.iie.cusflume.sink.HttpAsyncUtils.HttpClientUtil.getXTAG(dataJson, topic));
- ByteArrayEntity payload = new ByteArrayEntity(results);
- payload.setContentEncoding("utf-8");
- httpPost.setEntity(payload);
- long startTime = System.currentTimeMillis();
- response = httpClient.execute(httpPost);
- int statusCode = response.getStatusLine().getStatusCode();
- // 获取响应实体
- HttpEntity entity = response.getEntity();
- // 获取响应信息
- msg = EntityUtils.toString(entity, "UTF-8");
- Log.warn(Thread.currentThread().getName() + ",请求响应用时(ms):" + (System.currentTimeMillis() - startTime));
- if (statusCode != HttpStatus.SC_OK) {
- AvroMonitorTimerTask.addSuccessNum(batchSize);
- LOG.info("数据总线响应内容:" + msg);
- } else {
- AvroMonitorTimerTask.addFailedNum(batchSize);
- LOG.error("数据总线反馈出错:" + statusCode + ",msg" + msg);
-
- switch (statusCode) {
- case 300:
- LOG.info("AsyncPostMsgCallBack==>重定向响应体-redirect-ret==>"
- + ",resRedirBodyCode:300,当前服务节点负载过高,将向其他通信地址发送请求.");
- //若不包含对应字段,则不进行对象转换,减少报错
- if (msg.contains("redirect")) {
- ResRedirBody resRedirBody = JSONObject.parseObject(msg, ResRedirBody.class);
- String redirectUrl = resRedirBody.getData().getRedirect();
- if (StringUtils.isNotBlank(redirectUrl)) {
- YbHttpAvroSinkFile.changeUrl(redirectUrl);
- }
- } else {
- LOG.error("AsyncPostMsgCallBack==>服务端响应体中ResRedirBody.data.redirect字段不存在或格式不正确!!!");
- }
-
-
- break;
- case 301:
- LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode +
- ",resRedirBodyCode:301,当前所有服务端节点均负载过高,暂无可用资源,请等待.");
- break;
- case 410:
- LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode
- + "服务端响应时间(ms)==>" + "<==,"
- + ",resRedirBodyCode:410,Cookie已过期或Cookie错误,将开始更新Cookie.");
- YbHttpAvroSinkFile.updateCookie();
- break;
- case 500:
- LOG.error("AsyncPostMsgCallBack==>Status Code:" + statusCode
- + "服务端响应时间(ms)==>" + "<==,"
- + ",resRedirBodyCode:500,处理请求过程出现系统错误.");
- YbHttpAvroSinkFile.updateCookie();
- break;
- default:
- LOG.error("AsyncPostMsgCallBack==>数据加载失败,响应体:" + "<==,"
- + "---Status Code:" + statusCode);
- break;
- }
-
- }
-
-
- } catch (ClientProtocolException e) {
- LOG.error("协议错误: {}", e);
- } catch (ParseException e) {
- LOG.error("解析错误: {}", e);
- } catch (IOException e) {
- LOG.error("IO错误: {}", e);
- } catch (Exception e) {
- LOG.error("其它错误: {}", e);
- } finally {
- if (null != response) {
- try {
- EntityUtils.consumeQuietly(response.getEntity());
- response.close();
- } catch (IOException e) {
- LOG.error("释放链接错误: {}", e);
- }
- }
- }
-
- }
-
-}
-
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java
index 2ca287e..63a6d72 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/YbHttpAvroSinkFile.java
@@ -16,20 +16,16 @@ import org.apache.flume.sink.AbstractSink;
import org.apache.log4j.Logger;
import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
private static Logger logger = Logger.getLogger(YbHttpAvroSinkFile.class);
-
protected static ExecutorService pool = Executors.newFixedThreadPool(RealtimeCountConfig.HTTP_ASYNC_PARALLELISM);
- private static ConcurrentLinkedQueue<byte[]> concurrentLinkedQueue = new ConcurrentLinkedQueue<byte[]>();
private static DataCenterLoad dcl;
- private static String postMsgUrl;//发送消息路径,配置文件获取,发送文件与发送消息皆需要
+ private static String singleProduceUrl;//单条发送接口
+ private static String batchProduceUrl; //批量发送接口
private String postFileUrl;//发送文件路径,配置文件获取,仅发送文件时需要---若只发送消息,则此路径与postMsgUrl设置相同即可
@@ -61,14 +57,9 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
private static boolean checkTimerStart = false;//定时获取Cookie启动器
- private static boolean redirectContentsPostStart = false;//定时post重定向数据集合
- private static int batchInsertNum = RealtimeCountConfig.BATCH_INSERT_NUM;
- /**
- * 用于存储由于服务器资源不足所造成的未发送数据
- */
- public static List<String> redirectContents;
+
/**
* 用于存放验证以及连接的url的各组成部分,方便调取
@@ -84,35 +75,45 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
super.start();
dcl = new DataCenterLoad();
- redirectContents = new ArrayList<>();//初始化
/**
* 拆解初始化获取的url后缀,用于填充urlToolHm,用于后续动态负载均衡中的url变更
*/
- if (StringUtils.isNotBlank(checkMsgUrl) && StringUtils.isNotBlank(postMsgUrl)) {
+ if (StringUtils.isNotBlank(checkMsgUrl)
+ && StringUtils.isNotBlank(singleProduceUrl)
+ && StringUtils.isNotBlank(batchProduceUrl)) {
urlToolHm = new HashMap<>();
makeUrlSplitMap(checkMsgUrl, "check");
- makeUrlSplitMap(postMsgUrl, "post");
+ makeUrlSplitMap(singleProduceUrl, "post_single");
+ makeUrlSplitMap(batchProduceUrl, "post_batch");
+
} else {
logger.error("Starting YbHttpAvroSinkFile is error==>checkMsgUrl and postMsgUrl can not be null!!!!");
}
- logger.warn("启动Sink File 执行程序 ==============");
- //new Thread(new Consumer()).start();
- logger.warn("开启多线程消费队列数据==================");
-
+ new Thread(new RetrySendMessage()).start();
logger.warn("Starting YbHttpAvroSinkFile ... ...");
}
@Override
public void configure(Context context) {
try {
- postMsgUrl = context.getString("postMsgUrl", "");
- Preconditions.checkNotNull("".equals(postMsgUrl), "postMsgUrl must be set!!");
- logger.info("Read Post Message URL from configuration : " + postMsgUrl);
+ singleProduceUrl = context.getString("singleProduceUrl", "");
+ Preconditions.checkNotNull("".equals(singleProduceUrl), "singleProduceUrl must be set!!");
+ logger.info("Read Post Message URL from configuration : " + singleProduceUrl);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Endpoint Message URL invalid", e);
+ } catch (Exception e) {
+ logger.error("Get singleProduceUrl is error : " + e);
+ }
+
+ try {
+ batchProduceUrl = context.getString("batchProduceUrl", "");
+ Preconditions.checkNotNull("".equals(batchProduceUrl), "batchProduceUrl must be set!!");
+ logger.info("Read Post Message URL from configuration : " + batchProduceUrl);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Endpoint Message URL invalid", e);
} catch (Exception e) {
- logger.error("Get postMsgUrl is error : " + e);
+ logger.error("Get batchProduceUrl is error : " + e);
}
try {
@@ -178,7 +179,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
- Transaction transaction = null;
+ Transaction transaction = null;
try {
logger.debug("Current Process Thread number ID :" + Thread.currentThread().getId());
transaction = channel.getTransaction();
@@ -200,26 +201,23 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
switch (topicName) {
//作为单条发送-新分类-20191219
/**
- * 非文件消息
+ * 非文件消息:NTC-CONN-RECORD-LOG,NTC-COLLECT-DNS-LOG,NTC-COLLECT-SSL-LOG
+ * 文件类消息:NTC-COLLECT-FILE-LOG(有独立文件标签)、
+ * NTC-COLLECT-HTTP-DOC-LOG、NTC-COLLECT-HTTP-AV-LOG、NTC-COLLECT-FTP-DOC-LOG、
+ * NTC-COLLECT-MAIL-LOG、NTC-COLLECT-TELNET-LOG
+ * 状态消息:INFLUX-SAPP-BPS-STAT-LOG(读取回写的influxDB合计数据用作状态上传)
*/
case "NTC-CONN-RECORD-LOG":
case "NTC-COLLECT-DNS-LOG":
case "NTC-COLLECT-SSL-LOG":
- /**
- * 文件消息
- */
- case "NTC-COLLECT-FILE-LOG"://发送独立出来的文件标签
+ case "NTC-COLLECT-FILE-LOG":
case "NTC-COLLECT-HTTP-DOC-LOG":
- case "NTC-COLLECT-HTTP-AV-LOG"://schema等同于NTC-COLLECT-HTTP-DOC-LOG
+ case "NTC-COLLECT-HTTP-AV-LOG":
case "NTC-COLLECT-FTP-DOC-LOG":
case "NTC-COLLECT-MAIL-LOG":
case "NTC-COLLECT-TELNET-LOG":
- /**
- * 状态消息
- */
- case "INFLUX-SAPP-BPS-STAT-LOG"://读取回写的influxDB合计数据用作状态上传
- sendMsgLog(transaction, contents);//20191209移除文件发送,仅处理消息
- //sendMsgController(transaction, contents);
+ case "INFLUX-SAPP-BPS-STAT-LOG":
+ sendMsgLog(transaction, contents);
break;
default:
logger.error("YbHttpAvroSinkFile can't find this topic:" + topicName + ".Please confirm this topicName is correct!!!");
@@ -241,7 +239,6 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
} finally {
if (transaction != null) {
transaction.close();
- logger.debug("close Transaction");
}
}
return result;
@@ -260,7 +257,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
}
if (statusCode == 200) {
if (StringUtils.isNotBlank(acCheckMsgResBody.getSessionId())) {
- logger.warn("AC msg successfully,msg sessionId is ===>" + acCheckMsgResBody.getSessionId());
+ logger.info("AC msg successfully,msg sessionId is ===>" + acCheckMsgResBody.getSessionId());
msgSessionCookie = acCheckMsgResBody.getSessionId();
}
} else if (statusCode == 0) {
@@ -269,8 +266,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
logger.error("AC msg from ZX is error,statusCode is " + statusCode + "(case)=" + acCheckMsgResBody.getCode() + "(getMethod)<===");
logger.error("This " + statusCode + " ResponseBody(contain sessionId) is ===>" + acCheckMsgResBody.toString() + "<===");
}
- updateConfigInfo();//getMsgSessionCookie()更新
-// return producerResBody;
+ updateConfigInfo();
}
/**
@@ -301,7 +297,9 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
private static ConfigInfo updateConfigInfo() {
configInfo.setCheckMsgUrl(checkMsgUrl);
- configInfo.setPostMsgUrl(postMsgUrl);
+ configInfo.setSingleProduceUrl(singleProduceUrl);
+ configInfo.setBatchProduceUrl(batchProduceUrl);
+
configInfo.setMsgSessionCookie(msgSessionCookie);
configInfo.setMonitorSessionCookie(monitorSessionCookie);//缓存monitorSessionCookie
@@ -314,147 +312,9 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
return configInfo;
}
-
-
/**
- * 发送消息控制器
- * @param transaction
- * @param contents
- */
- private void sendMsgController(Transaction transaction, List<String> contents) {
- try {
- AvroMonitorTimerTask.addTotalNum(contents.size());
- AvroMonitorTimerTask.addReadyPostNum(contents.size());
- /**
- * 获取状态回传sessionID,检查认证是否存在
- */
- if (StringUtils.isBlank(monitorSessionCookie)
- || StringUtils.isBlank(msgSessionCookie)) {
- getMonitorSessionCookie();//sendMsgLog-首次获取monitorSessionCookie
- getMsgSessionCookie();//sendMsgLog-msgSessionCookie为空,首次接入验证
- if (!checkTimerStart) {
- checkCookieEveryWeek();//sendMsgLog-第一次启动检测到monitorSessionCookie为空时启动任务但不进行验证,后续间隔一段时间后开始验证,每次申请monitorSessionCookie和msgSessionCookie两个Cookie
- checkTimerStart = true;
- logger.warn("CheckMsgAndFileCookie Timer is started......");
- }
-
- if (!monitorStart) {//消息定时上报
- AvroMonitorTimerTask.monitorMsg(monitorSessionCookie, postMsgUrl,
- "monitor-msg", 1, userAgent, topicName);//sendMsgLog-日志消息
- monitorStart = true;
- logger.warn("MonitorMsg Timer is started......");
- }
-
-
- if (!redirectContentsPostStart) {
- postRedirectDataEveryMin();
- redirectContentsPostStart = true;
- logger.warn("RedirectContents Timer Post is started......");
- }
- }
-
-
- if (isSingle(topicName)) {
-
- for (String content : contents) {
- pool.execute(new Runnable() {
- @Override
- public void run() {
- try {
-
- HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content,
- userAgent, msgSessionCookie, 1, System.currentTimeMillis());//初始发送count计数为0
- } catch (Exception e) {
- logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==.");
- }
- }
- });
- }
-
- } else {
- xTag = HttpClientUtil.getXTAG(contents.get(0), topicName);
- dataJson = contents.get(0);
- int size = contents.size() / 100;
- if (size > 0) {
- for (int i = 0; i < size; i++) {
- // byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents.subList(i*1, (i+1)*1));
- //concurrentLinkedQueue.add(msgResults);
- pool.execute(new Producer(contents.subList(i * 100, (i + 1) * 100)));
- }
- if (contents.size() % 100 > 0) {
- pool.execute(new Producer(contents.subList(size, contents.size())));
- }
- } else {
- pool.execute(new Producer(contents));
- }
- }
-
- } catch (Exception e) {
- logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<===");
- transaction.commit();
- } finally {
- if (transaction != null) {
- transaction.commit();
- }
- AvroMonitorTimerTask.subReadyPostNum(contents.size());
- }
-
-
-
-
-
- }
-
-
-
-
- class Producer implements Runnable {
-
- private List<String> contents;
- public Producer(List<String> contents) { ;
- this.contents = contents;
- }
- @Override
- public void run() {
- try {
- logger.debug("Current Producer Thread number ID :" + Thread.currentThread().getId());
- byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents);
- concurrentLinkedQueue.add(msgResults);
-
- } catch (Exception e) {
- logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<===");
-
- }
-
-
- }
- }
-
-
- class Consumer implements Runnable {
-
- @Override
- public void run() {
- logger.debug("Current Consumer Thread number ID :" + Thread.currentThread().getId());
- while(true) {
-
- if (concurrentLinkedQueue.isEmpty()) {
- logger.info("当前队列无数据,等待数据接入!");
- } else {
- byte[] result = concurrentLinkedQueue.poll();
- HttpClientUtil.asyncProducerAvroMessageToZX_toBatch(postMsgUrl, topicName, result, dataJson,
- userAgent, msgSessionCookie, 100, System.currentTimeMillis());//初始发送count计数为0
- logger.info("生产数据,等待数据接入!");
- }
- }
- }
- }
-
-
- /**
* 往zx发送文件数据的消息,即发送文件的message数据(结构化数据)
* 本来是作为文件消息发送,现该方法主要用于单条发送数据-20191224
- *
* @param transaction
* @param contents
*/
@@ -462,7 +322,7 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
try {
- logger.info("Current sendMsgLog Thread number ID :" + Thread.currentThread().getId());
+ logger.debug("Current sendMsgLog Thread number ID :" + Thread.currentThread().getId());
/**
* 获取状态回传sessionID,检查认证是否存在
@@ -478,33 +338,25 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
}
if (!monitorStart) {//消息定时上报
- AvroMonitorTimerTask.monitorMsg(monitorSessionCookie, postMsgUrl,
+ AvroMonitorTimerTask.monitorMsg(monitorSessionCookie, singleProduceUrl,
"monitor-msg", 1, userAgent, topicName);//sendMsgLog-日志消息
monitorStart = true;
logger.debug("MonitorMsg Timer is started......");
}
- if (!redirectContentsPostStart) {
- postRedirectDataEveryMin();
- redirectContentsPostStart = true;
- logger.debug("RedirectContents Timer Post is started......");
- }
+ logger.info("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie);
}
-
AvroMonitorTimerTask.addTotalNum(contents.size());
- logger.debug("AC msg sessionId already exists,msg sessionId is ===>" + msgSessionCookie);
AvroMonitorTimerTask.addReadyPostNum(contents.size());
if (isSingle(topicName)) {
for (String content : contents) {
-
pool.execute(new Runnable() {
@Override
public void run() {
try {
-
- HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content,
+ HttpClientUtil.singleAsyncProduceMessage(singleProduceUrl, topicName, content,
userAgent, msgSessionCookie, 1, System.currentTimeMillis());//初始发送count计数为0
} catch (Exception e) {
logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==.");
@@ -514,61 +366,30 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
}
} else {
long beginAvroTime = System.currentTimeMillis();
+ xTag = HttpClientUtil.getXTAG(contents.get(0), topicName);
byte[] msgResults = HttpClientUtil.encapsulationData(topicName, contents);
- logger.debug("Batch Avro and Encapsulation Data Time(ms):"+ (System.currentTimeMillis() - beginAvroTime) );
- HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0),
- userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());
- /* for (String content : contents) {
- HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0),
- userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());//初始发送count计数为0
- }*/
-
-
- /*for (String content : contents) {
-
- pool.execute(new Runnable() {
- @Override
- public void run() {
- try {
-
- HttpClientUtil.asyncProducerAvroMessageToZX_bk(postMsgUrl, topicName, msgResults, contents.get(0),
- userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());
- } catch (Exception e) {
- logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==.");
- }
- }
- });
- }*/
-
- /* for (String content : contents) {
-
- pool.execute(new Runnable() {
- @Override
- public void run() {
- try {
- SinkService.getInstance().producerAvroMessageToBus(postMsgUrl, topicName, msgResults, contents.get(0),
- userAgent, msgSessionCookie, contents.size());
- } catch (Exception e) {
- logger.error("sendMsgLog multi-thread is error==>" + e + "<==,Thread is==>" + Thread.currentThread().getName() + "<==.");
- }
- }
- });
- }*/
+ logger.info("Batch Avro and Encapsulation Data Time(ms):"+ (System.currentTimeMillis() - beginAvroTime) );
+ HttpClientUtil.batchAsyncProduceMessage(batchProduceUrl, topicName, msgResults, xTag,
+ userAgent, msgSessionCookie, contents.size(), System.currentTimeMillis());
}
} catch (Exception e) {
- logger.error("YbHttpAvroSinkFile send Msg is error===>" + e + "<===");
+ logger.error("YbHttpAvroSinkFile Send Msg is error===>" + e + "<===");
transaction.commit();
- } finally {
- if (transaction != null) {
- transaction.commit();
- }
- AvroMonitorTimerTask.subReadyPostNum(contents.size());
- }
+ } finally {
+ if (transaction != null) {
+ transaction.commit();
+ }
+ AvroMonitorTimerTask.subReadyPostNum(contents.size());
+ }
}
-
+ /**
+ * 单条消息发送处理逻辑
+ * @param topicName
+ * @return
+ */
private static boolean isSingle(String topicName) {
if (topicName.equals("NTC-COLLECT-FILE-LOG")
@@ -619,31 +440,25 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
}, 1000 * 60 * 60 * 24 * 7, 1000 * 60 * 60 * 24 * 7);//每隔7天执行一次
}
- /**
- * 重定向数据集上传定时器,每隔一段时间扫描并上传一次-用于上传因为服务器资源紧张暂未上传的数据
- */
- private void postRedirectDataEveryMin() {
- Timer timer = new Timer();
- timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- if (redirectContents.size() > 0) {
- List<String> tmpListFreq = new ArrayList<>(redirectContents);
- redirectContents.clear();
- AvroMonitorTimerTask.msgReadyPostSum += tmpListFreq.size();
- for (String content : tmpListFreq) {
- HttpClientUtil.asyncProducerAvroToZX(postMsgUrl, topicName, content, userAgent, msgSessionCookie, 0, System.currentTimeMillis());//postRedirectDataEveryMin定时器-初始发送count计数为0
- }
- logger.info("PostRedirectDataEveryMin post to zx RedirectData size==>" + tmpListFreq.size() + "<==.");
- }
- } catch (Exception e) {
- logger.error("PostRedirectDataEveryMin to zx everyMin is error===>>>" + e + "<===");
+
+ class RetrySendMessage implements Runnable {
+
+ @Override
+ public void run() {
+ logger.debug("Current Consumer Thread number ID :" + Thread.currentThread().getId());
+ while(true) {
+ if (!HttpClientUtil.getRetryQueue().isEmpty()) {
+ logger.info("当前队列大小:" + HttpClientUtil.getRetryQueue().size());
+ HttpClientUtil.singleAsyncProduceMessage(singleProduceUrl, topicName,
+ HttpClientUtil.getRetryQueue().poll(), userAgent, msgSessionCookie, 1, System.currentTimeMillis());
+
}
}
- }, 1000 * 60, 1000 * 60);//每隔1分钟执行一次
+ }
}
+
+
/**
* 动态负载均衡变更cookie-20200818
*
@@ -654,7 +469,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
/**
* 变更postMsgUrl与checkMsgUrl
*/
- postMsgUrl = redirectUrlPort + urlToolHm.get("post_suf_path");
+ singleProduceUrl = redirectUrlPort + urlToolHm.get("post_single_suf_path");
+ batchProduceUrl = redirectUrlPort + urlToolHm.get("post_batch_suf_path");
checkMsgUrl = redirectUrlPort + urlToolHm.get("check_suf_path");
/**
@@ -662,9 +478,14 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
*/
updateCookie();
- logger.info("YbHttpAvroSinkFile->changeUrl->change postMsgUrl:" + postMsgUrl + ",change checkMsgUrl:" + checkMsgUrl);
+ logger.warn("YbHttpAvroSinkFile->changeUrl->"
+ + "change singleProduceUrl:" + singleProduceUrl
+ + "change batchProduceUrl:" + batchProduceUrl
+ + ",change checkMsgUrl:" + checkMsgUrl);
}
+
+
/**
* 动态负载均衡更新cookie
*/
@@ -672,7 +493,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
getMonitorSessionCookie();//动态负载均衡修改url,重新获取cookie
getMsgSessionCookie();//动态负载均衡修改url,重新获取cookie
- logger.info("YbHttpAvroSinkFile->updateCookie update cookie,postMsgUrl:" + postMsgUrl
+ logger.warn("YbHttpAvroSinkFile->updateCookie update cookie,singleProduceUrl:" + singleProduceUrl
+ + "batchProduceUrl:" + batchProduceUrl
+ ",checkMsgUrl:" + checkMsgUrl
+ ",获取monitorSessionCookie:" + monitorSessionCookie
+ ",获取msgSessionCookie:" + msgSessionCookie);
@@ -685,7 +507,8 @@ public class YbHttpAvroSinkFile extends AbstractSink implements Configurable {
* @param urlType
*/
private static void makeUrlSplitMap(String oldUrlPath, String urlType) {
- String[] split = oldUrlPath.replace("http://", "").replace("https://", "").split("/", 2);
+ String[] split = oldUrlPath.replace("http://", "")
+ .replace("https://", "").split("/", 2);
if (split.length == 2) {
/*
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java
index d62c81a..7d5ae66 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/AvroMonitorTimerTask.java
@@ -88,7 +88,7 @@ public class AvroMonitorTimerTask {
if ("yb".equals(RealtimeCountConfig.MONITOR_TYPE)) {//只有当类型为一部(yb)时才进行状态上报
String sendMsg = getJson(RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE, RealtimeCountConfig.MONITOR_SYSTEM_COMPONENT_CODE_FLUME, topicType);//新版-20200428
logger.info("Send monitor message is===>>>" + sendMsg + "<<<===");
- HttpClientUtil.asyncProducerAvroToZX(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0, System.currentTimeMillis());//静态方法无返回值用于多线程,初始发送count计数为0
+ HttpClientUtil.singleAsyncProduceMessage(postMonitorUrl, monitorCategory, sendMsg, userAgent, monitorSessionCookie, 0, System.currentTimeMillis());//静态方法无返回值用于多线程,初始发送count计数为0
}
} catch (Exception e) {
logger.error("Send monitorMsg to zx is error===>>>" + e + "<===");
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java
index e479b4d..2c1372e 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/avroUtils/HttpManager.java
@@ -32,6 +32,7 @@ import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.*;
@@ -41,6 +42,8 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.LaxRedirectStrategy;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.message.BasicHeaderElementIterator;
+import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;
@@ -171,6 +174,21 @@ public class HttpManager {
// 声明重定向策略对象
LaxRedirectStrategy redirectStrategy = new LaxRedirectStrategy();
+
+ ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
+ HeaderElementIterator it = new BasicHeaderElementIterator
+ (response.headerIterator(HTTP.CONN_KEEP_ALIVE));
+ while (it.hasNext()) {
+ HeaderElement he = it.nextElement();
+ String param = he.getName();
+ String value = he.getValue();
+ if (value != null && param.equalsIgnoreCase("timeout")) {
+ return Long.parseLong(value) * 1000;
+ }
+ }
+ return 10 * 1000;//如果没有约定,则默认定义时长为60s
+ };
+
/**
* 原版
*/
@@ -179,6 +197,7 @@ public class HttpManager {
.setDefaultRequestConfig(requestConfig)
.setRedirectStrategy(redirectStrategy)
.setRetryHandler(myRetryHandler)
+ .setKeepAliveStrategy(myStrategy)
.build();
return httpClient;
@@ -629,6 +648,7 @@ public class HttpManager {
httpPost.addHeader("User-Agent", userAgent);
httpPost.addHeader("X-Tag", xTag);//根据最新文档,目前已经不需要此头-20191217
httpPost.addHeader("Content-Type", "application/json");
+ httpPost.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
StringEntity payload = new StringEntity(data, Charset.forName("utf-8"));
//payload.setContentType("text/xml; charset=UTF-8");
payload.setContentEncoding("utf-8");
@@ -671,14 +691,15 @@ public class HttpManager {
// 执行httpClient.execute(httpGet)的异常
e.printStackTrace();
} finally {
- if (response != null) {
+ if (null != response) {
try {
+ EntityUtils.consumeQuietly(response.getEntity());
response.close();
} catch (IOException e) {
- e.printStackTrace();
+ logger.error("释放链接错误: " + e.getMessage());
}
}
- httpPost.abort();
+ //httpPost.abort();
}
return acResBody;
}
diff --git a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/bean/configBean/ConfigInfo.java b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/bean/configBean/ConfigInfo.java
index cd23d28..4da6319 100644
--- a/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/bean/configBean/ConfigInfo.java
+++ b/yb_http_avro_sink_file/src/main/java/cn/ac/iie/cusflume/sink/bean/configBean/ConfigInfo.java
@@ -9,7 +9,8 @@ package cn.ac.iie.cusflume.sink.bean.configBean;
*/
public class ConfigInfo {
- private String postMsgUrl;
+ private String singleProduceUrl;
+ private String batchProduceUrl;
private String postFileUrl;
private String checkMsgUrl;
@@ -26,12 +27,20 @@ public class ConfigInfo {
private String xTag;
private int batchSize;
- public String getPostMsgUrl() {
- return postMsgUrl;
+ public String getSingleProduceUrl() {
+ return singleProduceUrl;
}
- public void setPostMsgUrl(String postMsgUrl) {
- this.postMsgUrl = postMsgUrl;
+ public void setSingleProduceUrl(String singleProduceUrl) {
+ this.singleProduceUrl = singleProduceUrl;
+ }
+
+ public String getBatchProduceUrl() {
+ return batchProduceUrl;
+ }
+
+ public void setBatchProduceUrl(String batchProduceUrl) {
+ this.batchProduceUrl = batchProduceUrl;
}
public String getPostFileUrl() {
@@ -126,7 +135,8 @@ public class ConfigInfo {
@Override
public String toString() {
return "ConfigInfo{" +
- "postMsgUrl='" + postMsgUrl + '\'' +
+ ", singleProduceUrl='" + singleProduceUrl + '\'' +
+ ", batchProduceUrl='" + batchProduceUrl + '\'' +
", postFileUrl='" + postFileUrl + '\'' +
", checkMsgUrl='" + checkMsgUrl + '\'' +
", checkFileUrl='" + checkFileUrl + '\'' +
diff --git a/yb_http_avro_sink_file/src/main/resources/flume_config.properties b/yb_http_avro_sink_file/src/main/resources/flume_config.properties
index a1bc883..4530cc2 100644
--- a/yb_http_avro_sink_file/src/main/resources/flume_config.properties
+++ b/yb_http_avro_sink_file/src/main/resources/flume_config.properties
@@ -2,19 +2,19 @@
http.async.parallelism=10
#异步Http客户端-等待数据超时时间,根据业务调整
-http.async.socketTimeout=10000
+http.async.socketTimeout=60000
#异步Http客户端-连接超时时间
-http.async.connectTimeout=10000
+http.async.connectTimeout=30000
#异步Http客户端-连接池最大连接数
-http.async.poolSize=80
+http.async.poolSize=4
#异步Http客户端-每个主机的并发最多只有1500
-http.async.maxPerRoute=80
+http.async.maxPerRoute=4
#异步Http客户端-从连接池中后去连接的timeout时间
-http.async.connectionRequestTimeout=15000
+http.async.connectionRequestTimeout=30000
#Schema配置信息