summaryrefslogtreecommitdiff
path: root/src/main/java/cn/ac/iie/common
diff options
context:
space:
mode:
authorcaohui <[email protected]>2020-04-29 14:32:05 +0800
committercaohui <[email protected]>2020-04-29 14:32:05 +0800
commitd15d7536f385ec4a1250ed15ed52fd6c05eb7431 (patch)
tree737ec8462ef62ac70caeee1533cbee4e76ceef98 /src/main/java/cn/ac/iie/common
VoIP Knowledge Base sip-voip-completion Initial commit 202004291431HEADmaster
Diffstat (limited to 'src/main/java/cn/ac/iie/common')
-rw-r--r--src/main/java/cn/ac/iie/common/CommonService.java68
-rw-r--r--src/main/java/cn/ac/iie/common/DataCenterLoad.java93
-rw-r--r--src/main/java/cn/ac/iie/common/HashTableConfig.java354
-rw-r--r--src/main/java/cn/ac/iie/common/HttpManager.java218
-rw-r--r--src/main/java/cn/ac/iie/common/RealtimeCountConfig.java99
5 files changed, 832 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/common/CommonService.java b/src/main/java/cn/ac/iie/common/CommonService.java
new file mode 100644
index 0000000..4cf8ec0
--- /dev/null
+++ b/src/main/java/cn/ac/iie/common/CommonService.java
@@ -0,0 +1,68 @@
+package cn.ac.iie.common;
+
+import java.io.Serializable;
+
+import org.apache.log4j.Logger;
+import cn.ac.iie.bean.ConfigCompile;;
+
+public class CommonService implements Serializable{
+
+ private static final long serialVersionUID = 6106510579752162633L;
+ private static Logger logger = Logger.getLogger(CommonService.class);
+ private static final Integer COMPILE_OPTION_NUMBER = 9;
+
+ public String[] splitMessageWithLogType(String message,String logType) {
+ Integer ZiYong_LOG_LENGTH = RealtimeCountConfig.LOG_COMMON_FIELD_NUM + HashTableConfig.LOG_SUB_OPTION_NUM_MAP.get(logType);
+ if(message!=null && !message.isEmpty()) {
+ String[] values = message.split(RealtimeCountConfig.LOG_STRING_SPLITTER);//分割符"\t"
+ if(values.length == ZiYong_LOG_LENGTH) {
+ return values;
+ } else {
+ logger.error(logType+"--->message length = " + values.length + " is illegal");//测试用
+ return null;
+ }
+ }
+ return null;
+ }
+
+ public ConfigCompile checkReplyFromRedis(String redisReply){
+ if(redisReply == null){
+ return null;
+ }
+ String[] str = redisReply.split("\t");
+ if(str.length == COMPILE_OPTION_NUMBER){
+ try{
+ ConfigCompile cc = new ConfigCompile(str);
+ if(cc.getSERVICE().equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER)){
+ logger.error("私有标签为空, 配置id为: "+ cc.getCOMPILE_ID());
+ return null;
+ }
+ return cc;
+ } catch (Exception e){
+ e.printStackTrace();
+ return null;
+ }
+ }else{
+ return null;
+ }
+ }
+
+ public ConfigCompile checkPz(String pzStr){
+ String[] str = pzStr.split("\t");
+ if(str.length == COMPILE_OPTION_NUMBER){
+ try{
+ ConfigCompile cc = new ConfigCompile(str);
+ if(cc.getSERVICE().equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER)){
+ logger.error("私有标签为空, 配置id为: "+ cc.getCOMPILE_ID());
+ return null;
+ }
+ return cc;
+ } catch (Exception e){
+ e.printStackTrace();
+ return null;
+ }
+ }else{
+ return null;
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/common/DataCenterLoad.java b/src/main/java/cn/ac/iie/common/DataCenterLoad.java
new file mode 100644
index 0000000..eeb1a3a
--- /dev/null
+++ b/src/main/java/cn/ac/iie/common/DataCenterLoad.java
@@ -0,0 +1,93 @@
+package cn.ac.iie.common;
+
+import org.apache.log4j.Logger;
+
+import java.text.SimpleDateFormat;
+import java.util.Map;
+
+public class DataCenterLoad {
+ private static final String url = HttpManager.getInfoLoadInstance().getAddress();
+ private static Logger logger = Logger.getLogger(DataCenterLoad.class);
+
+ public DataCenterLoad() {
+ }
+
+ private String generateTimeWithInterval() {
+ Long stamp = System.currentTimeMillis() + 300000L;
+ Long stamp5 = stamp / 300000 * 300000;
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ return df.format(stamp5);
+ }
+
+ public void dfPzFlowBatchStorage(Map<String, Long> pzMap) {
+ //String sql = " insert into DF_PZ_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, SERVICE, SUM, REPORT_TIME) " +
+ // " VALUES(SEQ_DF_PZ_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+ StringBuffer sb = new StringBuffer();
+ String time5 = generateTimeWithInterval();
+ int nums = 0;
+ for (String key : pzMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+ if (options[0] != null && options[0] != "" && options[1] != null && options[1] != "" && options[2] != null && options[2] != "" && options[3] != null && options[3] != "" && options[4] != null && options[4] != "" && options[5] != null && options[5] != "") {
+ String aItem = options[0] + "," + options[1] + "," + options[2] + "," + options[3] + "," + options[4] + "," + options[5] + "," + pzMap.get(key) + "," + time5;
+ sb.append(aItem + "\n");
+ nums++;
+ if (nums >= RealtimeCountConfig.BATCH_INSERT_NUM) {
+ String data = sb.substring(0, sb.length() - 1);
+ //输出的topic
+ logger.info("start to post data to dc---------> " + data);
+ System.out.println("start to post data to dc---------> " + data);
+ HttpManager.getInfoLoadInstance().postToDataCenter(url, "DF_PZ_FLOW_REPORT", data);//原本的方法
+ sb.setLength(0);
+ nums = 0;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ if (nums != 0) {
+ String data = sb.substring(0, sb.length() - 1);
+ HttpManager.getInfoLoadInstance().postToDataCenter(url, "DF_PZ_FLOW_REPORT", data);
+ sb.setLength(0);
+ nums = 0;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void dfPzBatchStorage(Map<String, Long> pzMap) {
+ //String sql = " insert into DF_PZ_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, SERVICE, SUM, REPORT_TIME) " +
+ // " VALUES(SEQ_DF_PZ_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+ StringBuffer sb = new StringBuffer();
+ String time5 = generateTimeWithInterval();
+ int nums = 0;
+ for (String key : pzMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+ String aItem = options[2] + "\t" + options[1] + "\t" + options[3] + "\t" + pzMap.get(key) + "\t" + time5;
+ sb.append(aItem + "\n");
+ nums++;
+ if (nums >= RealtimeCountConfig.BATCH_INSERT_NUM) {
+ String data = sb.substring(0, sb.length() - 1);
+ HttpManager.getInfoLoadInstance().postToDataCenter(url, "t_xa_df_pz_report_dt", data);
+ sb.setLength(0);
+ nums = 0;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ if (nums != 0) {
+ String data = sb.substring(0, sb.length() - 1);
+ HttpManager.getInfoLoadInstance().postToDataCenter(url, "t_xa_df_pz_report_dt", data);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/common/HashTableConfig.java b/src/main/java/cn/ac/iie/common/HashTableConfig.java
new file mode 100644
index 0000000..4ee640f
--- /dev/null
+++ b/src/main/java/cn/ac/iie/common/HashTableConfig.java
@@ -0,0 +1,354 @@
+package cn.ac.iie.common;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HashTableConfig implements Serializable {
+
+ private static final long serialVersionUID = -6843770738516927321L;
+
+ /**
+ * ISO 3166-1 alpha2
+ */
+ public static final Map<String, String> ISO_3166_1_ALPHA_2 = new HashMap<String, String>() {
+
+ // private static final long serialVersionUID = 5231960246987011322L;
+ private static final long serialVersionUID = -6972673762779232428L;
+
+ {
+
+ put("阿富汗","AF");
+ put("奥兰","AX");
+ put("阿尔巴尼亚","AL");
+ put("阿尔及利亚","DZ");
+ put("美属萨摩亚","AS");
+ put("安道尔","AD");
+ put("安哥拉","AO");
+ put("安圭拉","AI");
+ put("南极洲","AQ");
+ put("安提瓜和巴布达","AG");
+ put("阿根廷","AR");
+ put("亚美尼亚","AM");
+ put("阿鲁巴","AW");
+ put("澳大利亚","AU");
+ put("奥地利","AT");
+ put("阿塞拜疆","AZ");
+ put("巴哈马","BS");
+ put("巴林","BH");
+ put("孟加拉国","BD");
+ put("巴巴多斯","BB");
+ put("白俄罗斯","BY");
+ put("比利时","BE");
+ put("伯利兹","BZ");
+ put("贝宁","BJ");
+ put("百慕大","BM");
+ put("不丹","BT");
+ put("玻利维亚","BO");
+ put("荷兰加勒比区","BQ");
+ put("波斯尼亚和黑塞哥维那","BA");
+ put("博茨瓦纳","BW");
+ put("布韦岛","BV");
+ put("巴西","BR");
+ put("英属印度洋领地","IO");
+ put("文莱","BN");
+ put("保加利亚","BG");
+ put("布基纳法索","BF");
+ put("布隆迪","BI");
+ put("佛得角","CV");
+ put("柬埔寨","KH");
+ put("喀麦隆","CM");
+ put("加拿大","CA");
+ put("开曼群岛","KY");
+ put("中非","CF");
+ put("乍得","TD");
+ put("智利","CL");
+ put("中国","CN");
+ put("圣诞岛","CX");
+ put("科科斯(基林)群岛","CC");
+ put("哥伦比亚","CO");
+ put("科摩罗","KM");
+ put("刚果(布)","CG");
+ put("刚果(金)","CD");
+ put("库克群岛","CK");
+ put("哥斯达黎加","CR");
+ put("科特迪瓦","CI");
+ put("克罗地亚","HR");
+ put("古巴","CU");
+ put("库拉索","CW");
+ put("塞浦路斯","CY");
+ put("捷克","CZ");
+ put("丹麦","DK");
+ put("吉布提","DJ");
+ put("多米尼克","DM");
+ put("多米尼加","DO");
+ put("厄瓜多尔","EC");
+ put("埃及","EG");
+ put("萨尔瓦多","SV");
+ put("赤道几内亚","GQ");
+ put("厄立特里亚","ER");
+ put("爱沙尼亚","EE");
+ put("埃塞俄比亚","ET");
+ put("福克兰群岛","FK");
+ put("法罗群岛","FO");
+ put("斐济","FJ");
+ put("芬兰","FI");
+ put("法国","FR");
+ put("法属圭亚那","GF");
+ put("法属波利尼西亚","PF");
+ put("法属南方和南极洲领地","TF");
+ put("加蓬","GA");
+ put("冈比亚","GM");
+ put("格鲁吉亚","GE");
+ put("德国","DE");
+ put("加纳","GH");
+ put("直布罗陀","GI");
+ put("希腊","GR");
+ put("格陵兰","GL");
+ put("格林纳达","GD");
+ put("瓜德罗普","GP");
+ put("关岛","GU");
+ put("危地马拉","GT");
+ put("根西","GG");
+ put("几内亚","GN");
+ put("几内亚比绍","GW");
+ put("圭亚那","GY");
+ put("海地","HT");
+ put("赫德岛和麦克唐纳群岛","HM");
+ put("梵蒂冈","VA");
+ put("洪都拉斯","HN");
+ put("香港","HK");
+ put("匈牙利","HU");
+ put("冰岛","IS");
+ put("印度","IN");
+ put("印尼","ID");
+ put("伊朗","IR");
+ put("伊拉克","IQ");
+ put("爱尔兰","IE");
+ put("马恩岛","IM");
+ put("以色列","IL");
+ put("意大利","IT");
+ put("牙买加","JM");
+ put("日本","JP");
+ put("泽西","JE");
+ put("约旦","JO");
+ put("哈萨克斯坦","KZ");
+ put("肯尼亚","KE");
+ put("基里巴斯","KI");
+ put("朝鲜","KP");
+ put("韩国","KR");
+ put("科威特","KW");
+ put("吉尔吉斯斯坦","KG");
+ put("老挝","LA");
+ put("拉脱维亚","LV");
+ put("黎巴嫩","LB");
+ put("莱索托","LS");
+ put("利比里亚","LR");
+ put("利比亚","LY");
+ put("列支敦士登","LI");
+ put("立陶宛","LT");
+ put("卢森堡","LU");
+ put("澳门","MO");
+ put("马其顿","MK");
+ put("马达加斯加","MG");
+ put("马拉维","MW");
+ put("马来西亚","MY");
+ put("马尔代夫","MV");
+ put("马里","ML");
+ put("马耳他","MT");
+ put("马绍尔群岛","MH");
+ put("马提尼克","MQ");
+ put("毛里塔尼亚","MR");
+ put("毛里求斯","MU");
+ put("马约特","YT");
+ put("墨西哥","MX");
+ put("密克罗尼西亚联邦","FM");
+ put("摩尔多瓦","MD");
+ put("摩纳哥","MC");
+ put("蒙古国","MN");
+ put("黑山","ME");
+ put("蒙特塞拉特","MS");
+ put("摩洛哥","MA");
+ put("莫桑比克","MZ");
+ put("缅甸","MM");
+ put("纳米比亚","NA");
+ put("瑙鲁","NR");
+ put("尼泊尔","NP");
+ put("荷兰","NL");
+ put("新喀里多尼亚","NC");
+ put("新西兰","NZ");
+ put("尼加拉瓜","NI");
+ put("尼日尔","NE");
+ put("尼日利亚","NG");
+ put("纽埃","NU");
+ put("诺福克岛","NF");
+ put("北马里亚纳群岛","MP");
+ put("挪威","NO");
+ put("阿曼","OM");
+ put("巴基斯坦","PK");
+ put("帕劳","PW");
+ put("巴勒斯坦","PS");
+ put("巴拿马","PA");
+ put("巴布亚新几内亚","PG");
+ put("巴拉圭","PY");
+ put("秘鲁","PE");
+ put("菲律宾","PH");
+ put("皮特凯恩群岛","PN");
+ put("波兰","PL");
+ put("葡萄牙","PT");
+ put("波多黎各","PR");
+ put("卡塔尔","QA");
+ put("留尼汪","RE");
+ put("罗马尼亚","RO");
+ put("俄罗斯","RU");
+ put("卢旺达","RW");
+ put("圣巴泰勒米","BL");
+ put("圣赫勒拿、阿森松和特里斯坦-达库尼亚","SH");
+ put("圣基茨和尼维斯","KN");
+ put("圣卢西亚","LC");
+ put("法属圣马丁","MF");
+ put("圣皮埃尔和密克隆","PM");
+ put("圣文森特和格林纳丁斯","VC");
+ put("萨摩亚","WS");
+ put("圣马力诺","SM");
+ put("圣多美和普林西比","ST");
+ put("沙特阿拉伯","SA");
+ put("塞内加尔","SN");
+ put("塞尔维亚","RS");
+ put("塞舌尔","SC");
+ put("塞拉利昂","SL");
+ put("新加坡","SG");
+ put("圣马丁","SX");
+ put("斯洛伐克","SK");
+ put("斯洛文尼亚","SI");
+ put("所罗门群岛","SB");
+ put("索马里","SO");
+ put("南非","ZA");
+ put("南乔治亚和南桑威奇群岛","GS");
+ put("南苏丹","SS");
+ put("西班牙","ES");
+ put("斯里兰卡","LK");
+ put("苏丹","SD");
+ put("苏里南","SR");
+ put("斯瓦尔巴和扬马延","SJ");
+ put("斯威士兰","SZ");
+ put("瑞典","SE");
+ put("瑞士","CH");
+ put("叙利亚","SY");
+ put("台湾","TW");
+ put("塔吉克斯坦","TJ");
+ put("坦桑尼亚","TZ");
+ put("泰国","TH");
+ put("东帝汶","TL");
+ put("多哥","TG");
+ put("托克劳","TK");
+ put("汤加","TO");
+ put("特立尼达和多巴哥","TT");
+ put("突尼斯","TN");
+ put("土耳其","TR");
+ put("土库曼斯坦","TM");
+ put("特克斯和凯科斯群岛","TC");
+ put("图瓦卢","TV");
+ put("乌干达","UG");
+ put("乌克兰","UA");
+ put("阿联酋","AE");
+ put("英国","GB");
+ put("美国","US");
+ put("美国本土外小岛屿","UM");
+ put("乌拉圭","UY");
+ put("乌兹别克斯坦","UZ");
+ put("瓦努阿图","VU");
+ put("委内瑞拉","VE");
+ put("越南","VN");
+ put("英属维尔京群岛","VG");
+ put("美属维尔京群岛","VI");
+ put("瓦利斯和富图纳","WF");
+ put("西撒哈拉","EH");
+ put("也门","YE");
+ put("赞比亚","ZM");
+ put("津巴布韦","ZW");
+
+ }
+ };
+
+ /**
+ * 业务类型
+ */
+ public static final Map<String, Integer> SERVICE_TYPE_MAP = new HashMap<String, Integer>() {
+ private static final long serialVersionUID = 8445342694006806126L;
+
+ {
+ put("DF-IP-PORT-LOG", 1);
+ put("FX-IP-PORT", 2);
+ put("DF-HTTP-REQ-LOG", 3);
+ put("DF-HTTP-RES-LOG", 4);
+ put("DF-HTTP-KEYWORD-LOG", 5);
+ put("DF-DNS-LOG", 6);
+ put("DF-PPTP-LOG", 7);
+ put("DF-L2TP-LOG", 8);
+ put("DF-IPSEC-LOG", 9);
+ put("DF-OPENVPN-LOG", 10);
+ put("DF-SSH-LOG", 11);
+ put("DF-SSL-LOG", 12);
+ put("DF-MAIL-LOG", 13);
+ put("DF-FTP-LOG", 14);
+
+ put("DJ-IP-PORT-LOG", 48);
+ put("DJ-HTTP-REQ-LOG", 49);
+ put("DJ-HTTP-RES-LOG", 50);
+ put("DJ-HTTP-KEYWORD-LOG", 51);
+ put("DJ-DNS-LOG", 52);
+ put("DJ-FTP-LOG", 53);
+ put("DJ-PPTP-LOG", 54);
+ put("DJ-L2TP-LOG", 55);
+ put("DJ-IPSEC-LOG", 56);
+ put("DJ-OPENVPN-LOG", 57);
+ put("DJ-SSH-LOG", 58);
+ put("DJ-SSL-LOG", 59);
+ put("DJ-MAIL-LOG", 60);
+ }
+ };
+
+ //日志表字段数(不包括id,因为id前面不传回来)-----20181228修改
+ public static final Map<String, Integer> LOG_SUB_OPTION_NUM_MAP = new HashMap<String, Integer>() {
+
+ private static final long serialVersionUID = 5231960246987011322L;
+
+ {
+ put("DF-IP-PORT-LOG", 0);
+ put("DF-HTTP-REQ-LOG", 12);
+ put("DF-HTTP-RES-LOG", 15);
+ put("DF-HTTP-KEYWORD-LOG", 10);
+ put("DF-MAIL-LOG", 5);
+ put("DF-DNS-LOG", 9);
+ put("DF-FTP-LOG", 1);
+ put("DF-PPTP-LOG", 3);
+ put("DF-L2TP-LOG", 4);
+ put("DF-IPSEC-LOG", 2);
+ put("DF-OPENVPN-LOG", 4);
+ put("DF-SSH-LOG", 6);
+ put("DF-SSL-LOG", 6);
+ put("DF-TUNNEL-RANDOM-LOG", 1);
+
+ put("NTC-CONN-RECORD-LOG", 5);//正式用,这是表上的字段数
+// put("NTC-CONN-RECORD-LOG", 3);//测试用,这是测试数据上的字段数
+
+
+ put("DJ-IP-PORT-LOG", 0);
+ put("DJ-HTTP-REQ-LOG", 12);
+ put("DJ-HTTP-RES-LOG", 15);
+ put("DJ-HTTP-KEYWORD-LOG", 10);
+ put("DJ-MAIL-LOG", 5);
+ put("DJ-DNS-LOG", 9);
+ put("DJ-FTP-LOG", 1);
+ put("DJ-PPTP-LOG", 3);
+ put("DJ-L2TP-LOG", 4);
+ put("DJ-IPSEC-LOG", 2);
+ put("DJ-OPENVPN-LOG", 4);
+ put("DJ-SSH-LOG", 6);
+ put("DJ-SSL-LOG", 6);
+
+ }
+ };
+
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/common/HttpManager.java b/src/main/java/cn/ac/iie/common/HttpManager.java
new file mode 100644
index 0000000..a1aa590
--- /dev/null
+++ b/src/main/java/cn/ac/iie/common/HttpManager.java
@@ -0,0 +1,218 @@
+package cn.ac.iie.common;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.util.Random;
+import javax.net.ssl.SSLException;
+
+//import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.HttpRequest;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpRequestRetryHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.client.LaxRedirectStrategy;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.util.EntityUtils;
+import org.apache.log4j.Logger;
+
+
+public class HttpManager {
+ // 创建httpclient连接池
+ private PoolingHttpClientConnectionManager httpClientConnectionManager = null;
+ private CloseableHttpClient httpClient = null;
+ //类初始化时,自动实例化,饿汉单例模式
+ private static final HttpManager manager = new HttpManager();
+ private static Logger logger = Logger.getLogger(HttpManager.class);
+
+ public static HttpManager getInfoLoadInstance(){
+ return manager;
+ }
+
+ private HttpManager(){
+ //初始化httpClient
+ initHttpClient();
+ System.setProperty("sun.net.inetaddr.ttl", "300");
+ System.setProperty("sun.net.inetaddr.negative.ttl", "10");
+ }
+
+ public void initHttpClient(){
+ //创建httpclient连接池
+ httpClientConnectionManager = new PoolingHttpClientConnectionManager();
+ //设置连接池最大数量
+ httpClientConnectionManager.setMaxTotal(2000);
+ //设置单个路由最大连接数量
+ httpClientConnectionManager.setDefaultMaxPerRoute(400);
+
+ httpClient=getHttpClient();
+ }
+ //请求重试机制
+
+ HttpRequestRetryHandler myRetryHandler = new HttpRequestRetryHandler() {
+ @Override
+ public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
+ if (executionCount >= 2) {
+ // 超过两次则不再重试请求
+ logger.error("http连接已重试"+executionCount+"次, 重试失败");
+ return false;
+ }
+ if (exception instanceof InterruptedIOException) {
+ // Timeout
+ logger.error("InterruptedIOException, 重试连接。。。");
+ return true;
+ }
+ if (exception instanceof UnknownHostException) {
+ // Unknown host
+ return false;
+ }
+ if (exception instanceof ConnectTimeoutException) {
+ logger.error("ConnectTimeoutException, 重试连接。。。");
+ // Connection refused
+ return true;
+ }
+ if (exception instanceof SSLException) {
+ // SSL handshake exception
+ return false;
+ }
+ HttpClientContext clientContext = HttpClientContext.adapt(context);
+ HttpRequest request = clientContext.getRequest();
+ boolean idempotent = !(request instanceof HttpEntityEnclosingRequest);
+ if (idempotent) {
+ logger.error("request is idempotent, 重试连接。。。");
+ // Retry if the request is considered idempotent
+ return true;
+ }
+ return false;
+ }
+ };
+
+ public CloseableHttpClient getHttpClient(){
+ // 创建全局的requestConfig
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectTimeout(3000)
+ .setSocketTimeout(3000)
+ //.setCookieSpec(CookieSpecs.BEST_MATCH)
+ .build();
+ // 声明重定向策略对象
+ LaxRedirectStrategy redirectStrategy = new LaxRedirectStrategy();
+
+ CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(httpClientConnectionManager)
+ .setDefaultRequestConfig(requestConfig)
+ .setRedirectStrategy(redirectStrategy)
+ .setRetryHandler(myRetryHandler)
+ .build();
+ return httpClient;
+ }
+
+ public String getAddress(){
+ String[] addrs = RealtimeCountConfig.DATACENTER_ADDRS.split(",");
+
+ Random rnd = new Random();
+ Integer addrIndex = rnd.nextInt(addrs.length);
+ return addrs[addrIndex].trim();
+ }
+
+
+ public void postToDataCenter(String url, String topic, String data){
+ CloseableHttpResponse response = null;
+ HttpPost httpPost = null;
+ url = url.trim();
+ try {
+ httpPost = new HttpPost(url);
+// httpPost.addHeader("Connection","keep-alive");
+// httpPost.addHeader("Accept-Encoding", "gzip, deflate");
+ //httpPost.addHeader("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36");
+
+// httpPost.addHeader("User", RealtimeCountConfig.DATACENTER_USERNAME);
+// httpPost.addHeader("Password", RealtimeCountConfig.DATACENTER_PASSWORD);
+ httpPost.addHeader("Topic", topic);
+ httpPost.addHeader("Schema-Version", "2");
+ httpPost.addHeader("Format", "csv");
+// httpPost.addHeader("Row-Split", "\\n");
+// httpPost.addHeader("Field-Split", "\\t");
+ httpPost.addHeader("Row-Split", "\\n");
+ httpPost.addHeader("Field-Split", ",");
+// StringEntity payload = new StringEntity(data, Charset.forName("utf-8"));
+ StringEntity payload = new StringEntity(data);
+ //payload.setContentType("text/xml; charset=UTF-8");
+// payload.setContentEncoding("utf-8");
+ httpPost.setEntity(payload);
+ logger.info("数据中心加载内容: " + data);
+ //执行请求
+ response = httpClient.execute(httpPost);
+ try{
+ int statuCode = response.getStatusLine().getStatusCode();
+ //Header[] headers = response.getAllHeaders();
+ //logger.info("<<response header>>:");
+ //System.out.println("<<response header>>:");
+ //for(int i=0; i<headers.length; i++){
+ // logger.info(headers[i].getName() +" : "+headers[i].getValue());
+ //System.out.println(headers[i].getName() +" : "+headers[i].getValue());
+ //}
+ HttpEntity entity = response.getEntity();
+ if(statuCode==200){
+ logger.info("数据中心加载成功, 返回码: "+ statuCode);
+ System.out.println("数据中心加载成功, 返回码: " + statuCode);
+ EntityUtils.consume(entity);
+ }else{
+ String ret = EntityUtils.toString(entity);
+ EntityUtils.consume(entity);
+ logger.info("数据中心加载失败: "+ret+" --- code: "+statuCode+" ---失败数据为: \n"+data);
+ System.out.println("数据中心加载失败: " + ret + " --- code: " + statuCode + " ---失败数据为: \n" + data);
+ logger.error("数据中心加载失败: "+ret+" --- code: "+statuCode);
+ System.out.println("数据中心加载失败: " + ret + " --- code: " + statuCode);
+ }
+ } catch (Exception e){
+ e.printStackTrace();
+ }
+
+ } catch (MalformedURLException e) {
+ //执行URL url = new URL()的异常
+ e.printStackTrace();
+ } catch (ClientProtocolException e) {
+ // 执行httpClient.execute(httpGet)的异常
+ e.printStackTrace();
+ } catch (IOException e) {
+ // 执行httpClient.execute(httpGet)的异常
+ e.printStackTrace();
+ } finally{
+ if(response != null){
+ try {
+ response.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ httpPost.abort();
+ /**
+ * httpclient的链接有线程池管理,这里不用直接关闭
+ */
+// try {//关闭连接
+// httpClient.close();
+// } catch (IOException e) {
+// e.printStackTrace();
+// }
+ }
+ }
+
+// public static void main(String[] args) throws InterruptedException {
+// // TODO Auto-generated method stub
+// for(int i=0; i<100000; i++){
+// System.out.println("------------- "+i+" ------------");
+// DoubleWriteHttpManager.getInfoLoadInstance().postToDataCenter("http://www.runoob.com/try/ajax/demo_test_post.php", "topic", "data");
+// }
+// }
+
+}
diff --git a/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java b/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java
new file mode 100644
index 0000000..0184f45
--- /dev/null
+++ b/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java
@@ -0,0 +1,99 @@
+package cn.ac.iie.common;
+
+import cn.ac.iie.utils.RealtimeCountConfigurations;
+
+import java.io.Serializable;
+
+public class RealtimeCountConfig implements Serializable{
+
+ private static final long serialVersionUID = -8649024767966235184L;
+ public static final String LOG_STRING_SPLITTER = "\t";
+ public static final String BETWEEN_BOLTS_SPLITTER = "~=~";
+ public static final String EMPTY_OPTION_CHARACTER = "-";
+ /**
+ * 通用log表字段数
+ */
+ public static final Integer LOG_COMMON_FIELD_NUM = 23;//公共表字段数(不包括id,因为id前面不传回来,id为自增)
+
+ //-----------------realtime_config.properties------------------
+ public static final String BOOTSTRAP_SERVERS = RealtimeCountConfigurations.getStringProperty(0, "bootstrap.servers");
+ public static final String BOOTSTRAP_OUTPUT_SERVERS = RealtimeCountConfigurations.getStringProperty(0, "bootstrap.output.servers");
+ public static final String ACTIVE_SYSTEM = RealtimeCountConfigurations.getStringProperty(0, "active.system");
+ public static final Integer BATCH_INSERT_NUM = RealtimeCountConfigurations.getIntProperty(0, "batch.insert.num");
+ public static final String GROUP_ID = RealtimeCountConfigurations.getStringProperty(0, "group.id");
+ public static final String KAFKA_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.topic");
+ public static final String KAFKA_NTC_ORI_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.ntc.ori.topic");
+ public static final String KAFKA_SIP_ORIGIN_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.sip.origin.topic");
+ public static final String KAFKA_NTC_KILLED_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.ntc.killed.topic");
+ public static final String KAFKA_SIP_COMPLEMENT_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.sip.complement.topic");
+ public static final String KAFKA_ROUTE_RELATION_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.route.relation.topic");
+
+
+ public static final String ALL_LOG_OUTPUT_CONTROLLER = RealtimeCountConfigurations.getStringProperty(0, "all.log.output.controller");//全局catch日志打印控制器
+ public static final String PART_LOG_OUTPUT_CONTROLLER = RealtimeCountConfigurations.getStringProperty(0, "part.log.output.controller");//局部日志输出控制器
+
+
+ public static final String GROUP_ID_PREFIX = RealtimeCountConfigurations.getStringProperty(0, "group.id.prefix");//groupid前缀
+ public static final String GROUP_ID_SUFFIX = RealtimeCountConfigurations.getStringProperty(0, "group.id.suffix");//groupid后缀
+ public static final String FETCH_MAX_BYTES = RealtimeCountConfigurations.getStringProperty(0, "fetch.max.bytes");
+ public static final String MAX_PARTITION_FETCH_BYTES = RealtimeCountConfigurations.getStringProperty(0, "max.partition.fetch.bytes");
+ public static final String MAX_POLL_INTERVAL_MS = RealtimeCountConfigurations.getStringProperty(0, "max.poll.interval.ms");
+ public static final String MAX_POLL_RECORDS = RealtimeCountConfigurations.getStringProperty(0, "max.poll.records");
+ public static final String SESSION_TIMEOUT_MS = RealtimeCountConfigurations.getStringProperty(0, "session.timeout.ms");
+ public static final String AUTO_OFFSET_RESET = RealtimeCountConfigurations.getStringProperty(0, "auto.offset.reset");
+ public static final String DATACENTER_ADDRS = RealtimeCountConfigurations.getStringProperty(0, "datacenter.addrs");
+ public static final String DATACENTER_USERNAME = RealtimeCountConfigurations.getStringProperty(0, "datacenter.username");
+ public static final String DATACENTER_PASSWORD = RealtimeCountConfigurations.getStringProperty(0, "datacenter.password");
+ public static final String TABLE_NAME = RealtimeCountConfigurations.getStringProperty(0, "table.name");
+ public static final String TABLE_KILLED_NAME = RealtimeCountConfigurations.getStringProperty(0, "table.killed.name");
+ public static final Integer BATCH_CHINSERT_NUM = RealtimeCountConfigurations.getIntProperty(0, "batch.chinsert.num");//clickhouse批量插入量
+ public static final Integer BATCH_KAFKA_INSERT_NUM = RealtimeCountConfigurations.getIntProperty(0, "batch.kafka.insert.num");//kafka批量插入量
+ public static final Integer BATCH_CHINSERT_KILLED_NUM = RealtimeCountConfigurations.getIntProperty(0, "batch.chinsert.killed.num");
+ public static final String IP_V4_LIBRARY = RealtimeCountConfigurations.getStringProperty(0, "ip.v4.library");//ipv4定位库
+ public static final String IP_V6_LIBRARY = RealtimeCountConfigurations.getStringProperty(0, "ip.v6.library");//ipv6定位库
+ public static final String IPIP_LIBRARY = RealtimeCountConfigurations.getStringProperty(0, "ipip.library");//ipip定位库
+
+ public static final String HDFS_URL = RealtimeCountConfigurations.getStringProperty(0,"hdfs.url");
+ public static final String HDFS_PATH = RealtimeCountConfigurations.getStringProperty(0,"hdfs.path");
+ public static final String HDFS_USER = RealtimeCountConfigurations.getStringProperty(0,"hdfs.user");
+ // public static final String HIVE_URL = RealtimeCountConfigurations.getStringProperty(0,"hive.url");
+// public static final String HIVE_USERNAME = RealtimeCountConfigurations.getStringProperty(0,"hive.username");
+// public static final String HIVE_PASSWORD = RealtimeCountConfigurations.getStringProperty(0,"hive.password");
+ public static final String HIVE_SIP_CLEAN_TABLE = RealtimeCountConfigurations.getStringProperty(0,"hive.sip.clean.table");
+// public static final String HIVE_SIP_ROUTE_TABLE = RealtimeCountConfigurations.getStringProperty(0,"hive.sip.route.table");
+
+ //---------------storm_config.properties---------------
+ public static final Integer SPOUT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "spout.parallelism");
+ public static final Integer FORMAT_BOLT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "format.bolt.parallelism");
+ public static final Integer BUFFER_BOLT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "buffer.bolt.parallelism");
+ public static final Integer DATABASE_BOLT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "database.bolt.parallelism");
+ public static final Integer COUNT_BOLT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "count.bolt.parallelism");
+ public static final Integer MERGE_BOLT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "merge.bolt.parallelism");
+ public static final Integer TOPOLOGY_WORKERS = RealtimeCountConfigurations.getIntProperty(1, "topology.workers");
+ public static final Integer GROUP_STRATEGY = RealtimeCountConfigurations.getIntProperty(1, "group.strategy");
+ public static final Integer TOPOLOGY_TICK_TUPLE_COMP_FREQ_SECS = RealtimeCountConfigurations.getIntProperty(1, "topology.tick.tuple.comp.freq.secs");
+ public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = RealtimeCountConfigurations.getIntProperty(1, "topology.tick.tuple.freq.secs");
+ public static final Integer TOPOLOGY_TICK_TUPLE_COUNT_FREQ_SECS = RealtimeCountConfigurations.getIntProperty(1, "topology.tick.tuple.count.freq.secs");
+ public static final Integer TOPOLOGY_TICK_TUPLE_MERGE_FREQ_SECS = RealtimeCountConfigurations.getIntProperty(1, "topology.tick.tuple.merge.freq.secs");
+ public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = RealtimeCountConfigurations.getIntProperty(1, "topology.config.max.spout.pending");
+ public static final Integer TOPOLOGY_NUM_ACKS = RealtimeCountConfigurations.getIntProperty(1, "topology.num.acks");
+
+ //参数展示
+ public static void configShow(){
+ System.out.println("BOOTSTRAP_SERVERS: "+BOOTSTRAP_SERVERS);
+ System.out.println("KAFKA_TOPIC: "+KAFKA_TOPIC);
+ System.out.println("ACTIVE_SYSTEM: "+ACTIVE_SYSTEM);
+ System.out.println("GROUP_ID: "+GROUP_ID);
+ System.out.println("GROUP_ID_PREFIX: "+GROUP_ID_PREFIX);
+ System.out.println("AUTO_OFFSET_RESET: "+AUTO_OFFSET_RESET);
+ System.out.println("TOPOLOGY_NUM_ACKS: "+TOPOLOGY_NUM_ACKS);
+ System.out.println("BATCH_INSERT_NUM: "+BATCH_INSERT_NUM);
+ System.out.println("TOPOLOGY_TICK_TUPLE_FREQ_SECS: "+TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ System.out.println("TOPOLOGY_CONFIG_MAX_SPOUT_PENDING: "+TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
+ System.out.println("TOPOLOGY_WORKERS: "+TOPOLOGY_WORKERS);
+ System.out.println("SPOUT_PARALLELISM: "+SPOUT_PARALLELISM);
+ System.out.println("FORMAT_BOLT_PARALLELISM: "+FORMAT_BOLT_PARALLELISM);
+ System.out.println("DATABASE_BOLT_PARALLELISM: "+DATABASE_BOLT_PARALLELISM);
+ System.out.println("GROUP_STRATEGY: "+GROUP_STRATEGY);
+ }
+} \ No newline at end of file