diff options
| author | caohui <[email protected]> | 2020-04-29 14:32:05 +0800 |
|---|---|---|
| committer | caohui <[email protected]> | 2020-04-29 14:32:05 +0800 |
| commit | d15d7536f385ec4a1250ed15ed52fd6c05eb7431 (patch) | |
| tree | 737ec8462ef62ac70caeee1533cbee4e76ceef98 /src/main/java/cn/ac/iie/common | |
Diffstat (limited to 'src/main/java/cn/ac/iie/common')
| -rw-r--r-- | src/main/java/cn/ac/iie/common/CommonService.java | 68 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/common/DataCenterLoad.java | 93 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/common/HashTableConfig.java | 354 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/common/HttpManager.java | 218 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/common/RealtimeCountConfig.java | 99 |
5 files changed, 832 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/common/CommonService.java b/src/main/java/cn/ac/iie/common/CommonService.java new file mode 100644 index 0000000..4cf8ec0 --- /dev/null +++ b/src/main/java/cn/ac/iie/common/CommonService.java @@ -0,0 +1,68 @@ +package cn.ac.iie.common; + +import java.io.Serializable; + +import org.apache.log4j.Logger; +import cn.ac.iie.bean.ConfigCompile;; + +public class CommonService implements Serializable{ + + private static final long serialVersionUID = 6106510579752162633L; + private static Logger logger = Logger.getLogger(CommonService.class); + private static final Integer COMPILE_OPTION_NUMBER = 9; + + public String[] splitMessageWithLogType(String message,String logType) { + Integer ZiYong_LOG_LENGTH = RealtimeCountConfig.LOG_COMMON_FIELD_NUM + HashTableConfig.LOG_SUB_OPTION_NUM_MAP.get(logType); + if(message!=null && !message.isEmpty()) { + String[] values = message.split(RealtimeCountConfig.LOG_STRING_SPLITTER);//分割符"\t" + if(values.length == ZiYong_LOG_LENGTH) { + return values; + } else { + logger.error(logType+"--->message length = " + values.length + " is illegal");//测试用 + return null; + } + } + return null; + } + + public ConfigCompile checkReplyFromRedis(String redisReply){ + if(redisReply == null){ + return null; + } + String[] str = redisReply.split("\t"); + if(str.length == COMPILE_OPTION_NUMBER){ + try{ + ConfigCompile cc = new ConfigCompile(str); + if(cc.getSERVICE().equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER)){ + logger.error("私有标签为空, 配置id为: "+ cc.getCOMPILE_ID()); + return null; + } + return cc; + } catch (Exception e){ + e.printStackTrace(); + return null; + } + }else{ + return null; + } + } + + public ConfigCompile checkPz(String pzStr){ + String[] str = pzStr.split("\t"); + if(str.length == COMPILE_OPTION_NUMBER){ + try{ + ConfigCompile cc = new ConfigCompile(str); + if(cc.getSERVICE().equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER)){ + logger.error("私有标签为空, 配置id为: "+ cc.getCOMPILE_ID()); + return null; + } + return cc; + } catch (Exception e){ + e.printStackTrace(); + return null; + } + }else{ + return null; + } + } +} diff --git a/src/main/java/cn/ac/iie/common/DataCenterLoad.java b/src/main/java/cn/ac/iie/common/DataCenterLoad.java new file mode 100644 index 0000000..eeb1a3a --- /dev/null +++ b/src/main/java/cn/ac/iie/common/DataCenterLoad.java @@ -0,0 +1,93 @@ +package cn.ac.iie.common; + +import org.apache.log4j.Logger; + +import java.text.SimpleDateFormat; +import java.util.Map; + +public class DataCenterLoad { + private static final String url = HttpManager.getInfoLoadInstance().getAddress(); + private static Logger logger = Logger.getLogger(DataCenterLoad.class); + + public DataCenterLoad() { + } + + private String generateTimeWithInterval() { + Long stamp = System.currentTimeMillis() + 300000L; + Long stamp5 = stamp / 300000 * 300000; + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + return df.format(stamp5); + } + + public void dfPzFlowBatchStorage(Map<String, Long> pzMap) { + //String sql = " insert into DF_PZ_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, SERVICE, SUM, REPORT_TIME) " + + // " VALUES(SEQ_DF_PZ_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + StringBuffer sb = new StringBuffer(); + String time5 = generateTimeWithInterval(); + int nums = 0; + for (String key : pzMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + if (options[0] != null && options[0] != "" && options[1] != null && options[1] != "" && options[2] != null && options[2] != "" && options[3] != null && options[3] != "" && options[4] != null && options[4] != "" && options[5] != null && options[5] != "") { + String aItem = options[0] + "," + options[1] + "," + options[2] + "," + options[3] + "," + options[4] + "," + options[5] + "," + pzMap.get(key) + "," + time5; + sb.append(aItem + "\n"); + nums++; + if (nums >= RealtimeCountConfig.BATCH_INSERT_NUM) { + String data = sb.substring(0, sb.length() - 1); + //输出的topic + logger.info("start to post data to dc---------> " + data); + System.out.println("start to post data to dc---------> " + data); + HttpManager.getInfoLoadInstance().postToDataCenter(url, "DF_PZ_FLOW_REPORT", data);//原本的方法 + sb.setLength(0); + nums = 0; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + try { + if (nums != 0) { + String data = sb.substring(0, sb.length() - 1); + HttpManager.getInfoLoadInstance().postToDataCenter(url, "DF_PZ_FLOW_REPORT", data); + sb.setLength(0); + nums = 0; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void dfPzBatchStorage(Map<String, Long> pzMap) { + //String sql = " insert into DF_PZ_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, SERVICE, SUM, REPORT_TIME) " + + // " VALUES(SEQ_DF_PZ_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + StringBuffer sb = new StringBuffer(); + String time5 = generateTimeWithInterval(); + int nums = 0; + for (String key : pzMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + String aItem = options[2] + "\t" + options[1] + "\t" + options[3] + "\t" + pzMap.get(key) + "\t" + time5; + sb.append(aItem + "\n"); + nums++; + if (nums >= RealtimeCountConfig.BATCH_INSERT_NUM) { + String data = sb.substring(0, sb.length() - 1); + HttpManager.getInfoLoadInstance().postToDataCenter(url, "t_xa_df_pz_report_dt", data); + sb.setLength(0); + nums = 0; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + try { + if (nums != 0) { + String data = sb.substring(0, sb.length() - 1); + HttpManager.getInfoLoadInstance().postToDataCenter(url, "t_xa_df_pz_report_dt", data); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/src/main/java/cn/ac/iie/common/HashTableConfig.java b/src/main/java/cn/ac/iie/common/HashTableConfig.java new file mode 100644 index 0000000..4ee640f --- /dev/null +++ b/src/main/java/cn/ac/iie/common/HashTableConfig.java @@ -0,0 +1,354 @@ +package cn.ac.iie.common; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class HashTableConfig implements Serializable { + + private static final long serialVersionUID = -6843770738516927321L; + + /** + * ISO 3166-1 alpha2 + */ + public static final Map<String, String> ISO_3166_1_ALPHA_2 = new HashMap<String, String>() { + + // private static final long serialVersionUID = 5231960246987011322L; + private static final long serialVersionUID = -6972673762779232428L; + + { + + put("阿富汗","AF"); + put("奥兰","AX"); + put("阿尔巴尼亚","AL"); + put("阿尔及利亚","DZ"); + put("美属萨摩亚","AS"); + put("安道尔","AD"); + put("安哥拉","AO"); + put("安圭拉","AI"); + put("南极洲","AQ"); + put("安提瓜和巴布达","AG"); + put("阿根廷","AR"); + put("亚美尼亚","AM"); + put("阿鲁巴","AW"); + put("澳大利亚","AU"); + put("奥地利","AT"); + put("阿塞拜疆","AZ"); + put("巴哈马","BS"); + put("巴林","BH"); + put("孟加拉国","BD"); + put("巴巴多斯","BB"); + put("白俄罗斯","BY"); + put("比利时","BE"); + put("伯利兹","BZ"); + put("贝宁","BJ"); + put("百慕大","BM"); + put("不丹","BT"); + put("玻利维亚","BO"); + put("荷兰加勒比区","BQ"); + put("波斯尼亚和黑塞哥维那","BA"); + put("博茨瓦纳","BW"); + put("布韦岛","BV"); + put("巴西","BR"); + put("英属印度洋领地","IO"); + put("文莱","BN"); + put("保加利亚","BG"); + put("布基纳法索","BF"); + put("布隆迪","BI"); + put("佛得角","CV"); + put("柬埔寨","KH"); + put("喀麦隆","CM"); + put("加拿大","CA"); + put("开曼群岛","KY"); + put("中非","CF"); + put("乍得","TD"); + put("智利","CL"); + put("中国","CN"); + put("圣诞岛","CX"); + put("科科斯(基林)群岛","CC"); + put("哥伦比亚","CO"); + put("科摩罗","KM"); + put("刚果(布)","CG"); + put("刚果(金)","CD"); + put("库克群岛","CK"); + put("哥斯达黎加","CR"); + put("科特迪瓦","CI"); + put("克罗地亚","HR"); + put("古巴","CU"); + put("库拉索","CW"); + put("塞浦路斯","CY"); + put("捷克","CZ"); + put("丹麦","DK"); + put("吉布提","DJ"); + put("多米尼克","DM"); + put("多米尼加","DO"); + put("厄瓜多尔","EC"); + put("埃及","EG"); + put("萨尔瓦多","SV"); + put("赤道几内亚","GQ"); + put("厄立特里亚","ER"); + put("爱沙尼亚","EE"); + put("埃塞俄比亚","ET"); + put("福克兰群岛","FK"); + put("法罗群岛","FO"); + put("斐济","FJ"); + put("芬兰","FI"); + put("法国","FR"); + put("法属圭亚那","GF"); + put("法属波利尼西亚","PF"); + put("法属南方和南极洲领地","TF"); + put("加蓬","GA"); + put("冈比亚","GM"); + put("格鲁吉亚","GE"); + put("德国","DE"); + put("加纳","GH"); + put("直布罗陀","GI"); + put("希腊","GR"); + put("格陵兰","GL"); + put("格林纳达","GD"); + put("瓜德罗普","GP"); + put("关岛","GU"); + put("危地马拉","GT"); + put("根西","GG"); + put("几内亚","GN"); + put("几内亚比绍","GW"); + put("圭亚那","GY"); + put("海地","HT"); + put("赫德岛和麦克唐纳群岛","HM"); + put("梵蒂冈","VA"); + put("洪都拉斯","HN"); + put("香港","HK"); + put("匈牙利","HU"); + put("冰岛","IS"); + put("印度","IN"); + put("印尼","ID"); + put("伊朗","IR"); + put("伊拉克","IQ"); + put("爱尔兰","IE"); + put("马恩岛","IM"); + put("以色列","IL"); + put("意大利","IT"); + put("牙买加","JM"); + put("日本","JP"); + put("泽西","JE"); + put("约旦","JO"); + put("哈萨克斯坦","KZ"); + put("肯尼亚","KE"); + put("基里巴斯","KI"); + put("朝鲜","KP"); + put("韩国","KR"); + put("科威特","KW"); + put("吉尔吉斯斯坦","KG"); + put("老挝","LA"); + put("拉脱维亚","LV"); + put("黎巴嫩","LB"); + put("莱索托","LS"); + put("利比里亚","LR"); + put("利比亚","LY"); + put("列支敦士登","LI"); + put("立陶宛","LT"); + put("卢森堡","LU"); + put("澳门","MO"); + put("马其顿","MK"); + put("马达加斯加","MG"); + put("马拉维","MW"); + put("马来西亚","MY"); + put("马尔代夫","MV"); + put("马里","ML"); + put("马耳他","MT"); + put("马绍尔群岛","MH"); + put("马提尼克","MQ"); + put("毛里塔尼亚","MR"); + put("毛里求斯","MU"); + put("马约特","YT"); + put("墨西哥","MX"); + put("密克罗尼西亚联邦","FM"); + put("摩尔多瓦","MD"); + put("摩纳哥","MC"); + put("蒙古国","MN"); + put("黑山","ME"); + put("蒙特塞拉特","MS"); + put("摩洛哥","MA"); + put("莫桑比克","MZ"); + put("缅甸","MM"); + put("纳米比亚","NA"); + put("瑙鲁","NR"); + put("尼泊尔","NP"); + put("荷兰","NL"); + put("新喀里多尼亚","NC"); + put("新西兰","NZ"); + put("尼加拉瓜","NI"); + put("尼日尔","NE"); + put("尼日利亚","NG"); + put("纽埃","NU"); + put("诺福克岛","NF"); + put("北马里亚纳群岛","MP"); + put("挪威","NO"); + put("阿曼","OM"); + put("巴基斯坦","PK"); + put("帕劳","PW"); + put("巴勒斯坦","PS"); + put("巴拿马","PA"); + put("巴布亚新几内亚","PG"); + put("巴拉圭","PY"); + put("秘鲁","PE"); + put("菲律宾","PH"); + put("皮特凯恩群岛","PN"); + put("波兰","PL"); + put("葡萄牙","PT"); + put("波多黎各","PR"); + put("卡塔尔","QA"); + put("留尼汪","RE"); + put("罗马尼亚","RO"); + put("俄罗斯","RU"); + put("卢旺达","RW"); + put("圣巴泰勒米","BL"); + put("圣赫勒拿、阿森松和特里斯坦-达库尼亚","SH"); + put("圣基茨和尼维斯","KN"); + put("圣卢西亚","LC"); + put("法属圣马丁","MF"); + put("圣皮埃尔和密克隆","PM"); + put("圣文森特和格林纳丁斯","VC"); + put("萨摩亚","WS"); + put("圣马力诺","SM"); + put("圣多美和普林西比","ST"); + put("沙特阿拉伯","SA"); + put("塞内加尔","SN"); + put("塞尔维亚","RS"); + put("塞舌尔","SC"); + put("塞拉利昂","SL"); + put("新加坡","SG"); + put("圣马丁","SX"); + put("斯洛伐克","SK"); + put("斯洛文尼亚","SI"); + put("所罗门群岛","SB"); + put("索马里","SO"); + put("南非","ZA"); + put("南乔治亚和南桑威奇群岛","GS"); + put("南苏丹","SS"); + put("西班牙","ES"); + put("斯里兰卡","LK"); + put("苏丹","SD"); + put("苏里南","SR"); + put("斯瓦尔巴和扬马延","SJ"); + put("斯威士兰","SZ"); + put("瑞典","SE"); + put("瑞士","CH"); + put("叙利亚","SY"); + put("台湾","TW"); + put("塔吉克斯坦","TJ"); + put("坦桑尼亚","TZ"); + put("泰国","TH"); + put("东帝汶","TL"); + put("多哥","TG"); + put("托克劳","TK"); + put("汤加","TO"); + put("特立尼达和多巴哥","TT"); + put("突尼斯","TN"); + put("土耳其","TR"); + put("土库曼斯坦","TM"); + put("特克斯和凯科斯群岛","TC"); + put("图瓦卢","TV"); + put("乌干达","UG"); + put("乌克兰","UA"); + put("阿联酋","AE"); + put("英国","GB"); + put("美国","US"); + put("美国本土外小岛屿","UM"); + put("乌拉圭","UY"); + put("乌兹别克斯坦","UZ"); + put("瓦努阿图","VU"); + put("委内瑞拉","VE"); + put("越南","VN"); + put("英属维尔京群岛","VG"); + put("美属维尔京群岛","VI"); + put("瓦利斯和富图纳","WF"); + put("西撒哈拉","EH"); + put("也门","YE"); + put("赞比亚","ZM"); + put("津巴布韦","ZW"); + + } + }; + + /** + * 业务类型 + */ + public static final Map<String, Integer> SERVICE_TYPE_MAP = new HashMap<String, Integer>() { + private static final long serialVersionUID = 8445342694006806126L; + + { + put("DF-IP-PORT-LOG", 1); + put("FX-IP-PORT", 2); + put("DF-HTTP-REQ-LOG", 3); + put("DF-HTTP-RES-LOG", 4); + put("DF-HTTP-KEYWORD-LOG", 5); + put("DF-DNS-LOG", 6); + put("DF-PPTP-LOG", 7); + put("DF-L2TP-LOG", 8); + put("DF-IPSEC-LOG", 9); + put("DF-OPENVPN-LOG", 10); + put("DF-SSH-LOG", 11); + put("DF-SSL-LOG", 12); + put("DF-MAIL-LOG", 13); + put("DF-FTP-LOG", 14); + + put("DJ-IP-PORT-LOG", 48); + put("DJ-HTTP-REQ-LOG", 49); + put("DJ-HTTP-RES-LOG", 50); + put("DJ-HTTP-KEYWORD-LOG", 51); + put("DJ-DNS-LOG", 52); + put("DJ-FTP-LOG", 53); + put("DJ-PPTP-LOG", 54); + put("DJ-L2TP-LOG", 55); + put("DJ-IPSEC-LOG", 56); + put("DJ-OPENVPN-LOG", 57); + put("DJ-SSH-LOG", 58); + put("DJ-SSL-LOG", 59); + put("DJ-MAIL-LOG", 60); + } + }; + + //日志表字段数(不包括id,因为id前面不传回来)-----20181228修改 + public static final Map<String, Integer> LOG_SUB_OPTION_NUM_MAP = new HashMap<String, Integer>() { + + private static final long serialVersionUID = 5231960246987011322L; + + { + put("DF-IP-PORT-LOG", 0); + put("DF-HTTP-REQ-LOG", 12); + put("DF-HTTP-RES-LOG", 15); + put("DF-HTTP-KEYWORD-LOG", 10); + put("DF-MAIL-LOG", 5); + put("DF-DNS-LOG", 9); + put("DF-FTP-LOG", 1); + put("DF-PPTP-LOG", 3); + put("DF-L2TP-LOG", 4); + put("DF-IPSEC-LOG", 2); + put("DF-OPENVPN-LOG", 4); + put("DF-SSH-LOG", 6); + put("DF-SSL-LOG", 6); + put("DF-TUNNEL-RANDOM-LOG", 1); + + put("NTC-CONN-RECORD-LOG", 5);//正式用,这是表上的字段数 +// put("NTC-CONN-RECORD-LOG", 3);//测试用,这是测试数据上的字段数 + + + put("DJ-IP-PORT-LOG", 0); + put("DJ-HTTP-REQ-LOG", 12); + put("DJ-HTTP-RES-LOG", 15); + put("DJ-HTTP-KEYWORD-LOG", 10); + put("DJ-MAIL-LOG", 5); + put("DJ-DNS-LOG", 9); + put("DJ-FTP-LOG", 1); + put("DJ-PPTP-LOG", 3); + put("DJ-L2TP-LOG", 4); + put("DJ-IPSEC-LOG", 2); + put("DJ-OPENVPN-LOG", 4); + put("DJ-SSH-LOG", 6); + put("DJ-SSL-LOG", 6); + + } + }; + +}
\ No newline at end of file diff --git a/src/main/java/cn/ac/iie/common/HttpManager.java b/src/main/java/cn/ac/iie/common/HttpManager.java new file mode 100644 index 0000000..a1aa590 --- /dev/null +++ b/src/main/java/cn/ac/iie/common/HttpManager.java @@ -0,0 +1,218 @@ +package cn.ac.iie.common; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.MalformedURLException; +import java.net.UnknownHostException; +import java.nio.charset.Charset; +import java.util.Random; +import javax.net.ssl.SSLException; + +//import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpRequest; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.client.LaxRedirectStrategy; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; +import org.apache.log4j.Logger; + + +public class HttpManager { + // 创建httpclient连接池 + private PoolingHttpClientConnectionManager httpClientConnectionManager = null; + private CloseableHttpClient httpClient = null; + //类初始化时,自动实例化,饿汉单例模式 + private static final HttpManager manager = new HttpManager(); + private static Logger logger = Logger.getLogger(HttpManager.class); + + public static HttpManager getInfoLoadInstance(){ + return manager; + } + + private HttpManager(){ + //初始化httpClient + initHttpClient(); + System.setProperty("sun.net.inetaddr.ttl", "300"); + System.setProperty("sun.net.inetaddr.negative.ttl", "10"); + } + + public void initHttpClient(){ + //创建httpclient连接池 + httpClientConnectionManager = new PoolingHttpClientConnectionManager(); + //设置连接池最大数量 + httpClientConnectionManager.setMaxTotal(2000); + //设置单个路由最大连接数量 + httpClientConnectionManager.setDefaultMaxPerRoute(400); + + httpClient=getHttpClient(); + } + //请求重试机制 + + HttpRequestRetryHandler myRetryHandler = new HttpRequestRetryHandler() { + @Override + public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { + if (executionCount >= 2) { + // 超过两次则不再重试请求 + logger.error("http连接已重试"+executionCount+"次, 重试失败"); + return false; + } + if (exception instanceof InterruptedIOException) { + // Timeout + logger.error("InterruptedIOException, 重试连接。。。"); + return true; + } + if (exception instanceof UnknownHostException) { + // Unknown host + return false; + } + if (exception instanceof ConnectTimeoutException) { + logger.error("ConnectTimeoutException, 重试连接。。。"); + // Connection refused + return true; + } + if (exception instanceof SSLException) { + // SSL handshake exception + return false; + } + HttpClientContext clientContext = HttpClientContext.adapt(context); + HttpRequest request = clientContext.getRequest(); + boolean idempotent = !(request instanceof HttpEntityEnclosingRequest); + if (idempotent) { + logger.error("request is idempotent, 重试连接。。。"); + // Retry if the request is considered idempotent + return true; + } + return false; + } + }; + + public CloseableHttpClient getHttpClient(){ + // 创建全局的requestConfig + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(3000) + .setSocketTimeout(3000) + //.setCookieSpec(CookieSpecs.BEST_MATCH) + .build(); + // 声明重定向策略对象 + LaxRedirectStrategy redirectStrategy = new LaxRedirectStrategy(); + + CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(httpClientConnectionManager) + .setDefaultRequestConfig(requestConfig) + .setRedirectStrategy(redirectStrategy) + .setRetryHandler(myRetryHandler) + .build(); + return httpClient; + } + + public String getAddress(){ + String[] addrs = RealtimeCountConfig.DATACENTER_ADDRS.split(","); + + Random rnd = new Random(); + Integer addrIndex = rnd.nextInt(addrs.length); + return addrs[addrIndex].trim(); + } + + + public void postToDataCenter(String url, String topic, String data){ + CloseableHttpResponse response = null; + HttpPost httpPost = null; + url = url.trim(); + try { + httpPost = new HttpPost(url); +// httpPost.addHeader("Connection","keep-alive"); +// httpPost.addHeader("Accept-Encoding", "gzip, deflate"); + //httpPost.addHeader("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36"); + +// httpPost.addHeader("User", RealtimeCountConfig.DATACENTER_USERNAME); +// httpPost.addHeader("Password", RealtimeCountConfig.DATACENTER_PASSWORD); + httpPost.addHeader("Topic", topic); + httpPost.addHeader("Schema-Version", "2"); + httpPost.addHeader("Format", "csv"); +// httpPost.addHeader("Row-Split", "\\n"); +// httpPost.addHeader("Field-Split", "\\t"); + httpPost.addHeader("Row-Split", "\\n"); + httpPost.addHeader("Field-Split", ","); +// StringEntity payload = new StringEntity(data, Charset.forName("utf-8")); + StringEntity payload = new StringEntity(data); + //payload.setContentType("text/xml; charset=UTF-8"); +// payload.setContentEncoding("utf-8"); + httpPost.setEntity(payload); + logger.info("数据中心加载内容: " + data); + //执行请求 + response = httpClient.execute(httpPost); + try{ + int statuCode = response.getStatusLine().getStatusCode(); + //Header[] headers = response.getAllHeaders(); + //logger.info("<<response header>>:"); + //System.out.println("<<response header>>:"); + //for(int i=0; i<headers.length; i++){ + // logger.info(headers[i].getName() +" : "+headers[i].getValue()); + //System.out.println(headers[i].getName() +" : "+headers[i].getValue()); + //} + HttpEntity entity = response.getEntity(); + if(statuCode==200){ + logger.info("数据中心加载成功, 返回码: "+ statuCode); + System.out.println("数据中心加载成功, 返回码: " + statuCode); + EntityUtils.consume(entity); + }else{ + String ret = EntityUtils.toString(entity); + EntityUtils.consume(entity); + logger.info("数据中心加载失败: "+ret+" --- code: "+statuCode+" ---失败数据为: \n"+data); + System.out.println("数据中心加载失败: " + ret + " --- code: " + statuCode + " ---失败数据为: \n" + data); + logger.error("数据中心加载失败: "+ret+" --- code: "+statuCode); + System.out.println("数据中心加载失败: " + ret + " --- code: " + statuCode); + } + } catch (Exception e){ + e.printStackTrace(); + } + + } catch (MalformedURLException e) { + //执行URL url = new URL()的异常 + e.printStackTrace(); + } catch (ClientProtocolException e) { + // 执行httpClient.execute(httpGet)的异常 + e.printStackTrace(); + } catch (IOException e) { + // 执行httpClient.execute(httpGet)的异常 + e.printStackTrace(); + } finally{ + if(response != null){ + try { + response.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + httpPost.abort(); + /** + * httpclient的链接有线程池管理,这里不用直接关闭 + */ +// try {//关闭连接 +// httpClient.close(); +// } catch (IOException e) { +// e.printStackTrace(); +// } + } + } + +// public static void main(String[] args) throws InterruptedException { +// // TODO Auto-generated method stub +// for(int i=0; i<100000; i++){ +// System.out.println("------------- "+i+" ------------"); +// DoubleWriteHttpManager.getInfoLoadInstance().postToDataCenter("http://www.runoob.com/try/ajax/demo_test_post.php", "topic", "data"); +// } +// } + +} diff --git a/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java b/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java new file mode 100644 index 0000000..0184f45 --- /dev/null +++ b/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java @@ -0,0 +1,99 @@ +package cn.ac.iie.common; + +import cn.ac.iie.utils.RealtimeCountConfigurations; + +import java.io.Serializable; + +public class RealtimeCountConfig implements Serializable{ + + private static final long serialVersionUID = -8649024767966235184L; + public static final String LOG_STRING_SPLITTER = "\t"; + public static final String BETWEEN_BOLTS_SPLITTER = "~=~"; + public static final String EMPTY_OPTION_CHARACTER = "-"; + /** + * 通用log表字段数 + */ + public static final Integer LOG_COMMON_FIELD_NUM = 23;//公共表字段数(不包括id,因为id前面不传回来,id为自增) + + //-----------------realtime_config.properties------------------ + public static final String BOOTSTRAP_SERVERS = RealtimeCountConfigurations.getStringProperty(0, "bootstrap.servers"); + public static final String BOOTSTRAP_OUTPUT_SERVERS = RealtimeCountConfigurations.getStringProperty(0, "bootstrap.output.servers"); + public static final String ACTIVE_SYSTEM = RealtimeCountConfigurations.getStringProperty(0, "active.system"); + public static final Integer BATCH_INSERT_NUM = RealtimeCountConfigurations.getIntProperty(0, "batch.insert.num"); + public static final String GROUP_ID = RealtimeCountConfigurations.getStringProperty(0, "group.id"); + public static final String KAFKA_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.topic"); + public static final String KAFKA_NTC_ORI_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.ntc.ori.topic"); + public static final String KAFKA_SIP_ORIGIN_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.sip.origin.topic"); + public static final String KAFKA_NTC_KILLED_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.ntc.killed.topic"); + public static final String KAFKA_SIP_COMPLEMENT_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.sip.complement.topic"); + public static final String KAFKA_ROUTE_RELATION_TOPIC = RealtimeCountConfigurations.getStringProperty(0, "kafka.route.relation.topic"); + + + public static final String ALL_LOG_OUTPUT_CONTROLLER = RealtimeCountConfigurations.getStringProperty(0, "all.log.output.controller");//全局catch日志打印控制器 + public static final String PART_LOG_OUTPUT_CONTROLLER = RealtimeCountConfigurations.getStringProperty(0, "part.log.output.controller");//局部日志输出控制器 + + + public static final String GROUP_ID_PREFIX = RealtimeCountConfigurations.getStringProperty(0, "group.id.prefix");//groupid前缀 + public static final String GROUP_ID_SUFFIX = RealtimeCountConfigurations.getStringProperty(0, "group.id.suffix");//groupid后缀 + public static final String FETCH_MAX_BYTES = RealtimeCountConfigurations.getStringProperty(0, "fetch.max.bytes"); + public static final String MAX_PARTITION_FETCH_BYTES = RealtimeCountConfigurations.getStringProperty(0, "max.partition.fetch.bytes"); + public static final String MAX_POLL_INTERVAL_MS = RealtimeCountConfigurations.getStringProperty(0, "max.poll.interval.ms"); + public static final String MAX_POLL_RECORDS = RealtimeCountConfigurations.getStringProperty(0, "max.poll.records"); + public static final String SESSION_TIMEOUT_MS = RealtimeCountConfigurations.getStringProperty(0, "session.timeout.ms"); + public static final String AUTO_OFFSET_RESET = RealtimeCountConfigurations.getStringProperty(0, "auto.offset.reset"); + public static final String DATACENTER_ADDRS = RealtimeCountConfigurations.getStringProperty(0, "datacenter.addrs"); + public static final String DATACENTER_USERNAME = RealtimeCountConfigurations.getStringProperty(0, "datacenter.username"); + public static final String DATACENTER_PASSWORD = RealtimeCountConfigurations.getStringProperty(0, "datacenter.password"); + public static final String TABLE_NAME = RealtimeCountConfigurations.getStringProperty(0, "table.name"); + public static final String TABLE_KILLED_NAME = RealtimeCountConfigurations.getStringProperty(0, "table.killed.name"); + public static final Integer BATCH_CHINSERT_NUM = RealtimeCountConfigurations.getIntProperty(0, "batch.chinsert.num");//clickhouse批量插入量 + public static final Integer BATCH_KAFKA_INSERT_NUM = RealtimeCountConfigurations.getIntProperty(0, "batch.kafka.insert.num");//kafka批量插入量 + public static final Integer BATCH_CHINSERT_KILLED_NUM = RealtimeCountConfigurations.getIntProperty(0, "batch.chinsert.killed.num"); + public static final String IP_V4_LIBRARY = RealtimeCountConfigurations.getStringProperty(0, "ip.v4.library");//ipv4定位库 + public static final String IP_V6_LIBRARY = RealtimeCountConfigurations.getStringProperty(0, "ip.v6.library");//ipv6定位库 + public static final String IPIP_LIBRARY = RealtimeCountConfigurations.getStringProperty(0, "ipip.library");//ipip定位库 + + public static final String HDFS_URL = RealtimeCountConfigurations.getStringProperty(0,"hdfs.url"); + public static final String HDFS_PATH = RealtimeCountConfigurations.getStringProperty(0,"hdfs.path"); + public static final String HDFS_USER = RealtimeCountConfigurations.getStringProperty(0,"hdfs.user"); + // public static final String HIVE_URL = RealtimeCountConfigurations.getStringProperty(0,"hive.url"); +// public static final String HIVE_USERNAME = RealtimeCountConfigurations.getStringProperty(0,"hive.username"); +// public static final String HIVE_PASSWORD = RealtimeCountConfigurations.getStringProperty(0,"hive.password"); + public static final String HIVE_SIP_CLEAN_TABLE = RealtimeCountConfigurations.getStringProperty(0,"hive.sip.clean.table"); +// public static final String HIVE_SIP_ROUTE_TABLE = RealtimeCountConfigurations.getStringProperty(0,"hive.sip.route.table"); + + //---------------storm_config.properties--------------- + public static final Integer SPOUT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "spout.parallelism"); + public static final Integer FORMAT_BOLT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "format.bolt.parallelism"); + public static final Integer BUFFER_BOLT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "buffer.bolt.parallelism"); + public static final Integer DATABASE_BOLT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "database.bolt.parallelism"); + public static final Integer COUNT_BOLT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "count.bolt.parallelism"); + public static final Integer MERGE_BOLT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "merge.bolt.parallelism"); + public static final Integer TOPOLOGY_WORKERS = RealtimeCountConfigurations.getIntProperty(1, "topology.workers"); + public static final Integer GROUP_STRATEGY = RealtimeCountConfigurations.getIntProperty(1, "group.strategy"); + public static final Integer TOPOLOGY_TICK_TUPLE_COMP_FREQ_SECS = RealtimeCountConfigurations.getIntProperty(1, "topology.tick.tuple.comp.freq.secs"); + public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = RealtimeCountConfigurations.getIntProperty(1, "topology.tick.tuple.freq.secs"); + public static final Integer TOPOLOGY_TICK_TUPLE_COUNT_FREQ_SECS = RealtimeCountConfigurations.getIntProperty(1, "topology.tick.tuple.count.freq.secs"); + public static final Integer TOPOLOGY_TICK_TUPLE_MERGE_FREQ_SECS = RealtimeCountConfigurations.getIntProperty(1, "topology.tick.tuple.merge.freq.secs"); + public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = RealtimeCountConfigurations.getIntProperty(1, "topology.config.max.spout.pending"); + public static final Integer TOPOLOGY_NUM_ACKS = RealtimeCountConfigurations.getIntProperty(1, "topology.num.acks"); + + //参数展示 + public static void configShow(){ + System.out.println("BOOTSTRAP_SERVERS: "+BOOTSTRAP_SERVERS); + System.out.println("KAFKA_TOPIC: "+KAFKA_TOPIC); + System.out.println("ACTIVE_SYSTEM: "+ACTIVE_SYSTEM); + System.out.println("GROUP_ID: "+GROUP_ID); + System.out.println("GROUP_ID_PREFIX: "+GROUP_ID_PREFIX); + System.out.println("AUTO_OFFSET_RESET: "+AUTO_OFFSET_RESET); + System.out.println("TOPOLOGY_NUM_ACKS: "+TOPOLOGY_NUM_ACKS); + System.out.println("BATCH_INSERT_NUM: "+BATCH_INSERT_NUM); + System.out.println("TOPOLOGY_TICK_TUPLE_FREQ_SECS: "+TOPOLOGY_TICK_TUPLE_FREQ_SECS); + System.out.println("TOPOLOGY_CONFIG_MAX_SPOUT_PENDING: "+TOPOLOGY_CONFIG_MAX_SPOUT_PENDING); + System.out.println("TOPOLOGY_WORKERS: "+TOPOLOGY_WORKERS); + System.out.println("SPOUT_PARALLELISM: "+SPOUT_PARALLELISM); + System.out.println("FORMAT_BOLT_PARALLELISM: "+FORMAT_BOLT_PARALLELISM); + System.out.println("DATABASE_BOLT_PARALLELISM: "+DATABASE_BOLT_PARALLELISM); + System.out.println("GROUP_STRATEGY: "+GROUP_STRATEGY); + } +}
\ No newline at end of file |
