summaryrefslogtreecommitdiff
path: root/src/main/java/cn/ac/iie/dao
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/cn/ac/iie/dao')
-rw-r--r--src/main/java/cn/ac/iie/dao/DataBaseBusiness.java1604
-rw-r--r--src/main/java/cn/ac/iie/dao/DataBaseLoad.java166
-rw-r--r--src/main/java/cn/ac/iie/dao/DataBasePzBusiness.java122
-rw-r--r--src/main/java/cn/ac/iie/dao/DbConnect.java102
-rw-r--r--src/main/java/cn/ac/iie/dao/JdbcConnectionManager.java392
-rw-r--r--src/main/java/cn/ac/iie/dao/JdbcPzConnectionManager.java392
-rw-r--r--src/main/java/cn/ac/iie/dao/KafkaDB.java81
7 files changed, 2859 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/dao/DataBaseBusiness.java b/src/main/java/cn/ac/iie/dao/DataBaseBusiness.java
new file mode 100644
index 0000000..9fae542
--- /dev/null
+++ b/src/main/java/cn/ac/iie/dao/DataBaseBusiness.java
@@ -0,0 +1,1604 @@
+package cn.ac.iie.dao;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import org.apache.log4j.Logger;
+import cn.ac.iie.common.RealtimeCountConfig;
+
+
+public final class DataBaseBusiness {
+ private static final JdbcConnectionManager manager = JdbcConnectionManager.getInstance();
+ private static final Logger logger = Logger.getLogger(DataBaseBusiness.class);
+ private Connection connection;
+ private PreparedStatement pstmt;
+
+ public DataBaseBusiness(){
+
+ }
+
+ private Long generateTimeWithInterval() throws ParseException{
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String dt = df.format(new Date());
+ System.out.println(dt);
+
+ String[] s = dt.split(":");
+ Integer i = Integer.valueOf(s[1]);
+ String ret = "";
+ if(i<5){
+ ret = s[0]+":00:00";
+ } else if(i<10){
+ ret = s[0]+":05:00";
+ } else if(i<15){
+ ret = s[0]+":10:00";
+ } else if(i<20){
+ ret = s[0]+":15:00";
+ } else if(i<25){
+ ret = s[0]+":20:00";
+ } else if(i<30){
+ ret = s[0]+":25:00";
+ } else if(i<35){
+ ret = s[0]+":30:00";
+ } else if(i<40){
+ ret = s[0]+":35:00";
+ } else if(i<45){
+ ret = s[0]+":40:00";
+ } else if(i<50){
+ ret = s[0]+":45:00";
+ } else if(i<55){
+ ret = s[0]+":50:00";
+ } else if(i<=59){
+ ret = s[0]+":55:00";
+ }
+
+ Date date=df.parse(ret);
+ Long l = date.getTime()+300000;
+ return l;
+ }
+
+ @SuppressWarnings("unused")
+ private void setBigDecimalNullable(PreparedStatement pstmts, int index, String str) throws Exception {
+ if (str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()) {
+ pstmts.setNull(index, Types.BIGINT);
+ } else {
+ pstmts.setBigDecimal(index, new BigDecimal(str));
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private void setBigDecimal(PreparedStatement pstmts, int index, String str) throws Exception {
+ pstmts.setBigDecimal(index, new BigDecimal(str));
+ }
+
+ @SuppressWarnings("unused")
+ private void setLongNullable(PreparedStatement pstmts, int index, String str) throws Exception {
+ if (str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()) {
+ pstmts.setNull(index, Types.BIGINT);
+ } else {
+ pstmts.setLong(index, Long.parseLong(str));
+ }
+ }
+
+ private void setLong(PreparedStatement pstmts, int index, String str) throws Exception {
+ pstmts.setLong(index, Long.parseLong(str));
+ }
+
+ @SuppressWarnings("unused")
+ private void setIntNullable(PreparedStatement pstmts, int index, String str) throws Exception {
+ if (str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()) {
+ pstmts.setNull(index, Types.INTEGER);
+ } else {
+ pstmts.setInt(index, Integer.parseInt(str));
+ }
+ }
+
+ private void setInt(PreparedStatement pstmts, int index, String str) throws Exception {
+ pstmts.setInt(index, Integer.parseInt(str));
+ }
+
+ private void setStringNullable(PreparedStatement pstmts, int index, String str, Integer maxLen) throws Exception {
+ if(str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()){
+ pstmts.setString(index, "");
+ } else {
+ int byteLen = str.getBytes("utf-8").length;
+ if(byteLen>maxLen){
+ pstmts.setString(index, str.substring(0, str.length()-(byteLen-maxLen)));
+ } else {
+ pstmts.setString(index, str);
+ }
+ }
+ }
+
+ private void setString(PreparedStatement pstmts, int index, String str, Integer maxLen) throws Exception {
+ if(str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()){
+ throw new SQLException("字符串为空,如字符串可空,请使用setStringNullable函数");
+ } else {
+ int byteLen = str.getBytes("utf-8").length;
+ if(byteLen>maxLen){
+ pstmts.setString(index, str.substring(0, str.length()-(byteLen-maxLen)));
+ } else {
+ pstmts.setString(index, str);
+ }
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private void setTimeStampNullable(PreparedStatement pstmts, int index, String str) throws Exception {
+ if (str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()) {
+ pstmts.setNull(index, Types.TIMESTAMP);
+ } else {
+ pstmts.setTimestamp(index, new Timestamp(Long.parseLong(str + "000")));
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private void setTimeStamp(PreparedStatement pstmts, int index, String str) throws Exception {
+ if(str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()){
+ throw new SQLException("时间为空,如字符串可空,请使用setTimeStampNullable函数");
+ } else {
+ pstmts.setTimestamp(index, new Timestamp(Long.parseLong(str + "000")));
+ }
+ }
+
+ private void generateTimeStamp(PreparedStatement pstmts, int index) throws Exception {
+ pstmts.setTimestamp(index, new Timestamp(generateTimeWithInterval()));
+ }
+
+ public void dfPzBatchStorage(Map<String, Long> pzMap) {
+ String sql = " insert into DF_PZ_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, SERVICE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_PZ_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : pzMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setLong(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ pstmt.setLong(4, pzMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+
+ public void dfServiceBatchStorage(Map<String, Long> serviceMap) {
+ String sql = " insert into DF_SERVICE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_SERVICE_REPORT.NEXTVAL, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : serviceMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ pstmt.setLong(3, serviceMap.get(key));
+ generateTimeStamp(pstmt, 4);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+
+ public void dfTagBatchStorage(Map<String, Long> tagMap) {
+ String sql = " insert into DF_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, TAG, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : tagMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ pstmt.setLong(4, tagMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+
+ public void dfSrcipDomesticBatchStorage(Map<String, Long> srcipMap) {
+ String sql = " insert into DF_SRCIP_DOMESTIC_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, SRC_CITY, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_SRCIP_DOMESTIC_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : srcipMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setString(pstmt, 3, options[3], 256);
+ setStringNullable(pstmt, 4, options[4], 256);
+ pstmt.setLong(5, srcipMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+
+ public void dfDestipCountryBatchStorage(Map<String, Long> srcipMap) {
+ String sql = " insert into DF_DESTIP_COUNTRY_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, DEST_COUNTRY, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_DESTIP_COUNTRY_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : srcipMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setString(pstmt, 3, options[3], 256);
+ pstmt.setLong(4, srcipMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+
+ public void djPzBatchStorage(Map<String, Long> pzMap) {
+ String sql = " insert into DJ_PZ_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, SERVICE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_PZ_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : pzMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setLong(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ pstmt.setLong(4, pzMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void dfAttrTypeBatchStorage(Map<String, Long> attrTypeMap) {
+ String sql = " insert into DF_ATTR_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, ATTR_TYPE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_ATTR_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : attrTypeMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ pstmt.setLong(4, attrTypeMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void dfEntranceBatchStorage(Map<String, Long> entranceMap) {
+ String sql = " insert into DF_ENTRANCE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, ENTRANCE_ID, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_ENTRANCE_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : entranceMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setLong(pstmt, 3, options[3]);
+ pstmt.setLong(4, entranceMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void dfLwhhBatchStorage(Map<String, Long> lwhhMap) {
+ String sql = " insert into DF_LWHH_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_LWHH_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : lwhhMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ pstmt.setLong(4, lwhhMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void dfLwhhTypeBatchStorage(Map<String, Long> lwhhTypeMap) {
+ String sql = " insert into DF_LWHH_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, ATTR_TYPE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_LWHH_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : lwhhTypeMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, lwhhTypeMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void dfLwhhTagBatchStorage(Map<String, Long> lwhhTagMap) {
+ String sql = " insert into DF_LWHH_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, TAG, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_LWHH_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : lwhhTagMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, lwhhTagMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void dfSrcipTypeBatchStorage(Map<String, Long> srcTypeMap) {
+ String sql = " insert into DF_SRCIP_DOMESTIC_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, ATTR_TYPE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_SRCIP_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : srcTypeMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setString(pstmt, 3, options[3], 256);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, srcTypeMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void dfSrcipTagBatchStorage(Map<String, Long> srcTagMap) {
+ String sql = " insert into DF_SRCIP_DOMESTIC_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, TAG, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_SRCIP_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : srcTagMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setString(pstmt, 3, options[3], 256);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, srcTagMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void dfPzAttrBatchStorage(Map<String, Long> pzAttrMap) {
+ String sql = " insert into DF_PZ_ATTR_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, ATTR_TYPE, SERVICE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_PZ_ATTR_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : pzAttrMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setLong(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, pzAttrMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void dfPzTagBatchStorage(Map<String, Long> pzTagMap) {
+ String sql = " insert into DF_PZ_TAG_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, TAG, SERVICE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DF_PZ_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : pzTagMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setLong(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, pzTagMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void djAttrTypeBatchStorage(Map<String, Long> attrTypeMap) {
+ String sql = " insert into DJ_ATTR_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, ATTR_TYPE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_ATTR_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : attrTypeMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ pstmt.setLong(4, attrTypeMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void djDestipCountryBatchStorage(Map<String, Long> srcipMap) {
+ String sql = " insert into DJ_DESTIP_COUNTRY_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, DEST_COUNTRY, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_DESTIP_COUNTRY_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : srcipMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setString(pstmt, 3, options[3], 256);
+ pstmt.setLong(4, srcipMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+
+ public void djEntranceBatchStorage(Map<String, Long> entranceMap) {
+ String sql = " insert into DJ_ENTRANCE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, ENTRANCE_ID, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_ENTRANCE_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : entranceMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setLong(pstmt, 3, options[3]);
+ pstmt.setLong(4, entranceMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+
+
+ public void djLwhhBatchStorage(Map<String, Long> lwhhMap) {
+ String sql = " insert into DJ_LWHH_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_LWHH_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : lwhhMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ pstmt.setLong(4, lwhhMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+
+ public void djServiceBatchStorage(Map<String, Long> serviceMap) {
+ String sql = " insert into DJ_SERVICE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_SERVICE_REPORT.NEXTVAL, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : serviceMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ pstmt.setLong(3, serviceMap.get(key));
+ generateTimeStamp(pstmt, 4);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+
+ public void djSrcipDomesticBatchStorage(Map<String, Long> srcipMap) {
+ String sql = " insert into DJ_SRCIP_DOMESTIC_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, SRC_CITY, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_SRCIP_DOMESTIC_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : srcipMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setString(pstmt, 3, options[3], 256);
+ setStringNullable(pstmt, 4, options[4], 256);
+ pstmt.setLong(5, srcipMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void djTagBatchStorage(Map<String, Long> tagMap) {
+ String sql = " insert into DJ_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, TAG, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : tagMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ pstmt.setLong(4, tagMap.get(key));
+ generateTimeStamp(pstmt, 5);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+
+ public void djLwhhTypeBatchStorage(Map<String, Long> lwhhTypeMap) {
+ String sql = " insert into DJ_LWHH_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, ATTR_TYPE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_LWHH_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : lwhhTypeMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, lwhhTypeMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void djLwhhTagBatchStorage(Map<String, Long> lwhhTagMap) {
+ String sql = " insert into DJ_LWHH_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, TAG, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_LWHH_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : lwhhTagMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, lwhhTagMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void djSrcipTypeBatchStorage(Map<String, Long> srcTypeMap) {
+ String sql = " insert into DJ_SRCIP_DOMESTIC_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, ATTR_TYPE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_SRCIP_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : srcTypeMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setString(pstmt, 3, options[3], 256);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, srcTypeMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void djSrcipTagBatchStorage(Map<String, Long> srcTagMap) {
+ String sql = " insert into DJ_SRCIP_DOMESTIC_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, TAG, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_SRCIP_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : srcTagMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setInt(pstmt, 2, options[2]);
+ setString(pstmt, 3, options[3], 256);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, srcTagMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void djPzAttrBatchStorage(Map<String, Long> pzAttrMap) {
+ String sql = " insert into DJ_PZ_ATTR_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, ATTR_TYPE, SERVICE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_PZ_ATTR_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : pzAttrMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setLong(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, pzAttrMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+
+ public void djPzTagBatchStorage(Map<String, Long> pzTagMap) {
+ String sql = " insert into DJ_PZ_TAG_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, TAG, SERVICE, SUM, REPORT_TIME) " +
+ " VALUES(SEQ_DJ_PZ_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)";
+
+ try {
+ connection = manager.getConnection("idb");
+ connection.setAutoCommit(false);
+ pstmt = connection.prepareStatement(sql);
+ } catch (Exception e) {
+ e.printStackTrace();
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ int nums = 0;
+ for (String key : pzTagMap.keySet()) {
+ try {
+ String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER);
+
+ setInt(pstmt, 1, options[1]);
+ setLong(pstmt, 2, options[2]);
+ setInt(pstmt, 3, options[3]);
+ setInt(pstmt, 4, options[4]);
+ pstmt.setLong(5, pzTagMap.get(key));
+ generateTimeStamp(pstmt, 6);
+
+ pstmt.addBatch();
+ nums++;
+ if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) {
+ pstmt.executeBatch();
+ connection.commit();
+ }
+ } catch (Exception e) {
+ logger.error("日志存在非法字段:"+key);
+ e.printStackTrace();
+ }
+ }
+ try {
+ pstmt.executeBatch();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ manager.clear(pstmt, null, null);
+ if(connection!=null){
+ manager.freeConnection("idb", connection);
+ }
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/dao/DataBaseLoad.java b/src/main/java/cn/ac/iie/dao/DataBaseLoad.java
new file mode 100644
index 0000000..05b88b3
--- /dev/null
+++ b/src/main/java/cn/ac/iie/dao/DataBaseLoad.java
@@ -0,0 +1,166 @@
+package cn.ac.iie.dao;
+
+
+import cn.ac.iie.bean.ntc.NTC_CONN_RECORD_LOG;
+import cn.ac.iie.common.RealtimeCountConfig;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.log4j.Logger;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.text.SimpleDateFormat;
+import java.util.LinkedList;
+import java.util.Map;
+
+
+/**
+ * 日志写入clickHouse insert类
+ *
+ * @author Administrator
+ * @create 2018-10-31 12:35
+ */
+public class DataBaseLoad {
+ private static final Logger logger = Logger.getLogger(DataBaseLoad.class);
+ private static DbConnect manger = DbConnect.getInstance();
+ private Connection connection;
+ private PreparedStatement pstm;
+
+ public DataBaseLoad() {
+ }
+
+ public void ntcKilledBatchStorage2CH(LinkedList<String> tmpList) {
+ String tableName = RealtimeCountConfig.TABLE_KILLED_NAME;
+ String sql = "INSERT INTO " + tableName + " VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ try {
+ connection = manger.getConnection();
+ connection.setAutoCommit(false);
+ pstm = connection.prepareStatement(sql);
+ int nums = 0;
+ for (String ntcLog : tmpList) {
+ NTC_CONN_RECORD_LOG ntcConnRecordLog = JSONObject.parseObject(ntcLog, NTC_CONN_RECORD_LOG.class);
+ pstm.setInt(1, ntcConnRecordLog.getCfg_id());
+ pstm.setInt(2, ntcConnRecordLog.getFound_time());
+ pstm.setInt(3, ntcConnRecordLog.getRecv_time());
+ pstm.setLong(4, ntcConnRecordLog.getOver_id());
+ pstm.setString(5, ntcConnRecordLog.getTrans_proto());
+ pstm.setString(6, ntcConnRecordLog.getD_ip());
+ pstm.setString(7, ntcConnRecordLog.getS_ip());
+ pstm.setInt(8, ntcConnRecordLog.getD_port());
+ pstm.setInt(9, ntcConnRecordLog.getS_port());
+ pstm.setString(10, ntcConnRecordLog.getNest_protocol());
+ pstm.setString(11, ntcConnRecordLog.getNest_server_ip());
+ pstm.setString(12, ntcConnRecordLog.getNest_client_ip());
+ pstm.setInt(13, ntcConnRecordLog.getNest_server_port());
+ pstm.setInt(14, ntcConnRecordLog.getNest_client_port());
+ pstm.setInt(15, ntcConnRecordLog.getService());
+ pstm.setInt(16, ntcConnRecordLog.getEntrance_id());
+ pstm.setString(17, ntcConnRecordLog.getCap_ip());
+ pstm.setString(18, ntcConnRecordLog.getScene_file());
+ pstm.setString(19, ntcConnRecordLog.getInjected_pkt_file());
+ pstm.setString(20, ntcConnRecordLog.getNest_addr_list());
+ pstm.setInt(21, ntcConnRecordLog.getAction());
+ pstm.setString(22, ntcConnRecordLog.getServer_locate());
+ pstm.setString(23, ntcConnRecordLog.getClient_locate());
+ pstm.setString(24, ntcConnRecordLog.getApp_label());
+ pstm.setLong(25, ntcConnRecordLog.getC2s_pkt_num());
+ pstm.setLong(26, ntcConnRecordLog.getS2c_pkt_num());
+ pstm.setLong(27, ntcConnRecordLog.getC2s_byte_num());
+ pstm.setLong(28, ntcConnRecordLog.getS2c_byte_num());
+
+ pstm.setString(29, ntcConnRecordLog.getUser_region());
+ pstm.setInt(30, ntcConnRecordLog.getStream_dir());
+ pstm.setString(31, ntcConnRecordLog.getAddr_list());
+ pstm.setInt(32, ntcConnRecordLog.getCreate_time());
+ pstm.setInt(33, ntcConnRecordLog.getLastmtime());
+
+ pstm.addBatch();
+ nums++;
+ if (nums >= RealtimeCountConfig.BATCH_CHINSERT_KILLED_NUM) {
+ pstm.executeBatch();
+ connection.commit();
+ nums = 0;
+ }
+ }
+ if (nums != 0) {
+ pstm.executeBatch();
+ connection.commit();
+ nums = 0;
+ }
+ } catch (Exception e) {
+ logger.error("+++++++++insert to " + RealtimeCountConfig.TABLE_KILLED_NAME + " Log write failed!!!+++++++++");
+ e.printStackTrace();
+ } finally {
+ manger.clear(pstm, connection);
+ }
+ }
+
+
+
+
+ public void dfPzFlowBatchStorage2CH(LinkedList<String> tmpList) {
+ String tableName = RealtimeCountConfig.TABLE_NAME;
+ String sql = "INSERT INTO " + tableName + " VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ try {
+ connection = manger.getConnection();
+ connection.setAutoCommit(false);
+ pstm = connection.prepareStatement(sql);
+ int nums = 0;
+ for (String ntcLog : tmpList) {
+ NTC_CONN_RECORD_LOG ntcConnRecordLog = JSONObject.parseObject(ntcLog, NTC_CONN_RECORD_LOG.class);
+ pstm.setInt(1, ntcConnRecordLog.getCfg_id());
+ pstm.setInt(2, ntcConnRecordLog.getFound_time());
+ pstm.setInt(3, ntcConnRecordLog.getRecv_time());
+ pstm.setLong(4, ntcConnRecordLog.getOver_id());
+ pstm.setString(5, ntcConnRecordLog.getTrans_proto());
+ pstm.setString(6, ntcConnRecordLog.getD_ip());
+ pstm.setString(7, ntcConnRecordLog.getS_ip());
+ pstm.setInt(8, ntcConnRecordLog.getD_port());
+ pstm.setInt(9, ntcConnRecordLog.getS_port());
+ pstm.setString(10, ntcConnRecordLog.getNest_protocol());
+ pstm.setString(11, ntcConnRecordLog.getNest_server_ip());
+ pstm.setString(12, ntcConnRecordLog.getNest_client_ip());
+ pstm.setInt(13, ntcConnRecordLog.getNest_server_port());
+ pstm.setInt(14, ntcConnRecordLog.getNest_client_port());
+ pstm.setInt(15, ntcConnRecordLog.getService());
+ pstm.setInt(16, ntcConnRecordLog.getEntrance_id());
+ pstm.setString(17, ntcConnRecordLog.getCap_ip());
+ pstm.setString(18, ntcConnRecordLog.getScene_file());
+ pstm.setString(19, ntcConnRecordLog.getInjected_pkt_file());
+ pstm.setString(20, ntcConnRecordLog.getNest_addr_list());
+ pstm.setInt(21, ntcConnRecordLog.getAction());
+ pstm.setString(22, ntcConnRecordLog.getServer_locate());
+ pstm.setString(23, ntcConnRecordLog.getClient_locate());
+ pstm.setString(24, ntcConnRecordLog.getApp_label());
+ pstm.setLong(25, ntcConnRecordLog.getC2s_pkt_num());
+ pstm.setLong(26, ntcConnRecordLog.getS2c_pkt_num());
+ pstm.setLong(27, ntcConnRecordLog.getC2s_byte_num());
+ pstm.setLong(28, ntcConnRecordLog.getS2c_byte_num());
+
+ pstm.setString(29, ntcConnRecordLog.getUser_region());
+ pstm.setInt(30, ntcConnRecordLog.getStream_dir());
+ pstm.setString(31, ntcConnRecordLog.getAddr_list());
+ pstm.setInt(32, ntcConnRecordLog.getCreate_time());
+ pstm.setInt(33, ntcConnRecordLog.getLastmtime());
+
+ pstm.addBatch();
+ nums++;
+ if (nums >= RealtimeCountConfig.BATCH_CHINSERT_NUM) {
+ pstm.executeBatch();
+ connection.commit();
+ nums = 0;
+ }
+ }
+ if (nums != 0) {
+ pstm.executeBatch();
+ connection.commit();
+ nums = 0;
+ }
+ } catch (Exception e) {
+ logger.error("+++++++++insert to " + RealtimeCountConfig.TABLE_NAME + " Log write failed!!!+++++++++");
+ e.printStackTrace();
+ } finally {
+ manger.clear(pstm, connection);
+ }
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/dao/DataBasePzBusiness.java b/src/main/java/cn/ac/iie/dao/DataBasePzBusiness.java
new file mode 100644
index 0000000..fd19201
--- /dev/null
+++ b/src/main/java/cn/ac/iie/dao/DataBasePzBusiness.java
@@ -0,0 +1,122 @@
+package cn.ac.iie.dao;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import org.apache.log4j.Logger;
+
+import cn.ac.iie.bean.PzTable;
+
+public final class DataBasePzBusiness {
+ private static final JdbcPzConnectionManager pzManager = JdbcPzConnectionManager.getInstance();
+ private static final Logger logger = Logger.getLogger(DataBasePzBusiness.class);
+ private Connection connection;
+ private Statement stmt = null;
+
+ public DataBasePzBusiness() {
+
+ }
+
+ public void getPzToMap(Long seq) {
+ String queryOptions = " COMPILE_ID, CONT_TYPE, ATTR_TYPE, CONT_LABEL, Task_id, Guarantee_ID, AFFAIR_ID, TOPIC_ID, IS_VALID, PROC_SEQ";
+ String sql = "select " + queryOptions + " from config_compile where PROC_SEQ > " + seq;
+ try {
+ connection = pzManager.getConnection("idb");
+ connection.setAutoCommit(false);
+ stmt = connection.createStatement();
+ } catch (Exception e) {
+ e.printStackTrace();
+ pzManager.clear(null, stmt, null);
+ if (connection != null) {
+ pzManager.freeConnection("idb", connection);
+ }
+ return;
+ }
+ try {
+ ResultSet rs = stmt.executeQuery(sql);
+ Long seqMax = seq;
+ while (rs.next()) {
+ try {
+ String sCOMPILE_ID = String.valueOf(rs.getLong(1));
+
+ Object oCONT_TYPE = rs.getObject(2);
+ if (oCONT_TYPE == null)
+ oCONT_TYPE = "";
+ String sCONT_TYPE = oCONT_TYPE.toString();
+ if (sCONT_TYPE == null || sCONT_TYPE.equals(""))
+ sCONT_TYPE = "-";
+
+ Object oATTR_TYPE = rs.getObject(3);
+ if (oATTR_TYPE == null)
+ oATTR_TYPE = "";
+ String sATTR_TYPE = oATTR_TYPE.toString();
+ if (sATTR_TYPE == null || sATTR_TYPE.equals(""))
+ sATTR_TYPE = "-";
+
+ Object oCONT_LABEL = rs.getObject(4);
+ if (oCONT_LABEL == null)
+ oCONT_LABEL = "";
+ String sCONT_LABEL = oCONT_LABEL.toString();
+ if (sCONT_LABEL == null || sCONT_LABEL.equals(""))
+ sCONT_LABEL = "-";
+
+ Object oTask_id = rs.getObject(5);
+ if (oTask_id == null)
+ oTask_id = "";
+ String sTask_id = oTask_id.toString();
+ if (sTask_id == null || sTask_id.equals(""))
+ sTask_id = "-";
+
+ Object oGuarantee_ID = rs.getObject(6);
+ if (oGuarantee_ID == null)
+ oGuarantee_ID = "";
+ String sGuarantee_ID = oGuarantee_ID.toString();
+ if (sGuarantee_ID == null || sGuarantee_ID.equals(""))
+ sGuarantee_ID = "-";
+
+ Object oAFFAIR_ID = rs.getObject(7);
+ if (oAFFAIR_ID == null)
+ oAFFAIR_ID = "";
+ String sAFFAIR_ID = oAFFAIR_ID.toString();
+ if (sAFFAIR_ID == null || sAFFAIR_ID.equals(""))
+ sAFFAIR_ID = "-";
+
+ Object oTOPIC_ID = rs.getObject(8);
+ if (oTOPIC_ID == null)
+ oTOPIC_ID = "";
+ String sTOPIC_ID = oTOPIC_ID.toString();
+ if (sTOPIC_ID == null || sTOPIC_ID.equals(""))
+ sTOPIC_ID = "-";
+
+ Integer iIS_VALID = rs.getInt(9);
+
+ Long lPROC_SEQ = rs.getLong(10);
+
+ if (lPROC_SEQ > seqMax) {
+ seqMax = lPROC_SEQ;
+ }
+
+ if (PzTable.seq == 0 && iIS_VALID == 0) {
+ continue;
+ }
+ String value = sCOMPILE_ID + "\t" + sCONT_TYPE + "\t" + sATTR_TYPE + "\t" + sCONT_LABEL + "\t"
+ + sTask_id + "\t" + sGuarantee_ID + "\t" + sAFFAIR_ID + "\t" + sTOPIC_ID + "\t" + iIS_VALID;
+ logger.info("put in map: "+value);
+ PzTable.pzMap.put(sCOMPILE_ID, value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ PzTable.seq = seqMax;
+ logger.info("PZ UPDATE DONE.");
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ pzManager.clear(null, stmt, null);
+ if (connection != null) {
+ pzManager.freeConnection("idb", connection);
+ }
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/dao/DbConnect.java b/src/main/java/cn/ac/iie/dao/DbConnect.java
new file mode 100644
index 0000000..0635e04
--- /dev/null
+++ b/src/main/java/cn/ac/iie/dao/DbConnect.java
@@ -0,0 +1,102 @@
+package cn.ac.iie.dao;
+
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.druid.pool.DruidPooledConnection;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * Druid连接信息
+ *
+ * @author antlee
+ * @date 2018/8/20
+ */
+public class DbConnect {
+ private static DruidDataSource dataSource = null;
+ private static DbConnect dbConnect = null;
+ private static Properties props = new Properties();
+
+ static {
+ getDbConnect();
+ }
+
+ private static void getDbConnect() {
+ try {
+ if (dataSource == null) {
+ dataSource = new DruidDataSource();
+ props.load(DbConnect.class.getClassLoader().getResourceAsStream("clickhouse.properties"));
+ //设置连接参数
+ dataSource.setUrl("jdbc:clickhouse://" + props.getProperty("db.id"));
+ dataSource.setDriverClassName(props.getProperty("drivers"));
+ dataSource.setUsername(props.getProperty("mdb.user"));
+ dataSource.setPassword(props.getProperty("mdb.password"));
+ //配置初始化大小、最小、最大
+ dataSource.setInitialSize(Integer.parseInt(props.getProperty("initialsize")));
+ dataSource.setMinIdle(Integer.parseInt(props.getProperty("minidle")));
+ dataSource.setMaxActive(Integer.parseInt(props.getProperty("maxactive")));
+ //连接泄漏监测
+ dataSource.setRemoveAbandoned(true);
+ dataSource.setRemoveAbandonedTimeout(30);
+ dataSource.setDefaultAutoCommit(false);
+ //配置获取连接等待超时的时间
+ dataSource.setMaxWait(30000);
+ //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+ dataSource.setTimeBetweenEvictionRunsMillis(2000);
+ //防止过期
+ dataSource.setValidationQuery("SELECT 1");
+ dataSource.setTestWhileIdle(true);
+ dataSource.setTestOnBorrow(true);
+ dataSource.setKeepAlive(true);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+
+ /**
+ * 数据库连接池单例
+ *
+ * @return dbConnect
+ */
+ public static synchronized DbConnect getInstance() {
+ if (null == dbConnect) {
+ dbConnect = new DbConnect();
+ }
+ return dbConnect;
+ }
+
+ /**
+ * 返回druid数据库连接
+ *
+ * @return 连接
+ * @throws SQLException sql异常
+ */
+ public DruidPooledConnection getConnection() throws SQLException {
+ return dataSource.getConnection();
+ }
+
+ /**
+ * 清空PreparedStatement、Connection对象,未定义的置空。
+ *
+ * @param pstmt PreparedStatement对象
+ * @param connection Connection对象
+ */
+ public void clear(PreparedStatement pstmt, Connection connection) {
+ try {
+ if (pstmt != null) {
+ pstmt.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ }
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/dao/JdbcConnectionManager.java b/src/main/java/cn/ac/iie/dao/JdbcConnectionManager.java
new file mode 100644
index 0000000..ef369f7
--- /dev/null
+++ b/src/main/java/cn/ac/iie/dao/JdbcConnectionManager.java
@@ -0,0 +1,392 @@
+package cn.ac.iie.dao;
+
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.Hashtable;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.Vector;
+
+import com.nis.exception.DaoException;
+
+
+
+
+
+/**
+ *
+ * <p>JDBC 连接管理类</p>
+ * <p>使用该类需要具备两个条件:
+ * <li>1.数据库驱动jar包</li>
+ * <li>2.在src目录下生成db.properties,具有如下内容
+ * drivers=* 数据库连接驱动类
+ * idb.url=* 数据库连接URL
+ * idb.maxconn=* 线程最大连接数
+ * idb.user=* 数据库连接用户名
+ * idb.password=* 数据库连接密码
+ * </li>
+ * </p>
+ * @author 中科智源育成信息有限公司 E-mail: [email protected]
+ * @version 1.0 创建时间:2010-11-8 下午04:54:15
+ *
+ */
+public final class JdbcConnectionManager {
+ private static final JdbcConnectionManager jdbcConnectionManager = new JdbcConnectionManager() ; // 唯一实例
+ private static int clients;
+ private Vector drivers = new Vector();
+ private Hashtable pools = new Hashtable();
+
+ //抑制默认的构造器,避免实例化对象
+ private JdbcConnectionManager(){
+ init();
+ }
+
+ /**
+ * 读取属性完成初始化
+ */
+ private void init() {
+
+ //InputStream is = getClass().getResourceAsStream("/db.properties");
+
+ Properties dbProps = new Properties();
+ try {
+ InputStream is = JdbcConnectionManager.class.getClassLoader().getResourceAsStream("db.properties");
+ //InputStream is = new FileInputStream(System.getProperty("user.dir")+File.separator+"config"+File.separator+"db.properties");
+ dbProps.load(is);
+ System.out.println("读取数据成功!");
+ } catch (Exception e) {
+ System.err.println("不能读取属性文件. "
+ + "请确保db.properties在CLASSPATH指定的路径中");
+ throw new RuntimeException(new FileNotFoundException("unknow db.properties"));
+ }
+ loadDrivers(dbProps);
+ createPools(dbProps);
+ }
+
+ /**
+ * 装载和注册所有JDBC驱动程序
+ *
+ * @param props
+ * 属性
+ */
+ private void loadDrivers(Properties props) {
+ String driverClasses = props.getProperty("drivers");
+ StringTokenizer st = new StringTokenizer(driverClasses);
+ while (st.hasMoreElements()) {
+ String driverClassName = st.nextToken().trim();
+
+ try {
+
+ Driver driver = (Driver) Class.forName(driverClassName)
+ .newInstance();
+ DriverManager.registerDriver(driver);
+ drivers.addElement(driver);
+ } catch (Exception e) {
+ System.out.println("无法装载驱动,异常信息:" + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * 根据指定属性创建连接池实例.
+ *
+ * @param props
+ * 连接池属性
+ */
+ private void createPools(Properties props) {
+ Enumeration propNames = props.propertyNames();
+ while (propNames.hasMoreElements()) {
+ String name = (String) propNames.nextElement();
+ if (name.endsWith(".url")) {
+ String poolName = name.substring(0, name.lastIndexOf("."));
+ String url = props.getProperty(poolName + ".url");
+ if (url == null) {
+ System.out.println("没有为连接池" + poolName + "指定URL");
+ continue;
+ }
+ String user = props.getProperty(poolName + ".user");
+ String password = props.getProperty(poolName + ".password");
+ String maxconn = props.getProperty(poolName + ".maxconn", "0");
+
+ int max;
+ try {
+ max = Integer.valueOf(maxconn).intValue();
+ } catch (NumberFormatException e) {
+ System.out.println("错误的最大连接数限制: " + maxconn + " .连接池: "
+ + poolName);
+ max = 0;
+ }
+ DBConnectionPool pool = new DBConnectionPool(poolName, url,
+ user, password, max);
+ pools.put(poolName, pool);
+
+ }
+ }
+ }
+
+
+ /**
+ * 返回singleton实例.如果是第一次调用此方法,则创建实例
+ *
+ * @return JdbcConnectionManager 唯一实例
+ */
+ public static synchronized JdbcConnectionManager getInstance() {
+ clients++;
+ return jdbcConnectionManager;
+ }
+
+
+
+ /**
+ * 将连接对象返回给由名字指定的连接池
+ *
+ * @param name
+ * 在属性文件中定义的连接池名字
+ * @param conn
+ * 连接对象
+ */
+ public void freeConnection(String name, Connection conn) {
+ DBConnectionPool pool = (DBConnectionPool) pools.get(name);
+ if (pool != null) {
+ pool.freeConnection(conn);
+ }
+ }
+
+ /**
+ * 获得一个可用的(空闲的)连接.如果没有可用连接,且已有连接数小于最大连接数 限制,则创建并返回新连接
+ *
+ * @param name
+ * 在属性文件中定义的连接池名字
+ * @return Connection 可用连接或null
+ */
+ public Connection getConnection(String name) throws DaoException{
+ DBConnectionPool pool = (DBConnectionPool) pools.get(name);
+ if (pool != null) {
+ return pool.getConnection();
+ }
+ return null;
+ }
+
+ /**
+ * 获得一个可用连接.若没有可用连接,且已有连接数小于最大连接数限制, 则创建并返回新连接.否则,在指定的时间内等待其它线程释放连接.
+ *
+ * @param name
+ * 连接池名字
+ * @param time
+ * 以毫秒计的等待时间
+ * @return Connection 可用连接或null
+ */
+ public Connection getConnection(String name, long time)throws DaoException {
+ DBConnectionPool pool = (DBConnectionPool) pools.get(name);
+ if (pool != null) {
+ return pool.getConnection(time);
+ }
+ return null;
+ }
+
+ //返回多少个连接
+ private int getClient() {
+ return clients;
+ }
+
+ /**
+ * 关闭所有连接,撤销驱动程序的注册
+ */
+ public synchronized void release() {
+ // 等待直到最后一个客户程序调用
+ if (--clients != 0) {
+ return;
+ }
+
+ Enumeration allPools = pools.elements();
+ while (allPools.hasMoreElements()) {
+ DBConnectionPool pool = (DBConnectionPool) allPools.nextElement();
+ pool.release();
+ }
+
+ Enumeration allDrivers = drivers.elements();
+ while (allDrivers.hasMoreElements()) {
+ Driver driver = (Driver) allDrivers.nextElement();
+ try {
+ DriverManager.deregisterDriver(driver);
+ // System.out.println("撤销JDBC驱动程序 " + driver.getClass().getName()
+ // + "的注册");
+ } catch (SQLException e) {
+ System.out.println("无法撤销下列JDBC驱动程序的注册: "
+ + driver.getClass().getName() + "。错误信息:"
+ + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ *
+ * 清空PreparedStatement、Statement、ResultSet对象,未定义的置空。
+ * @param pstmt PreparedStatement对象
+ * @param stmt Statement对象
+ * @param rs ResultSet对象
+ */
+ public void clear(PreparedStatement pstmt,Statement stmt,ResultSet rs) {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ if (rs != null) {
+ rs.close();
+ }
+ if (pstmt != null) {
+ pstmt.close();
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+
+ /**
+ * 此内部类定义了一个连接池.它能够根据要求创建新连接,直到预定的最大连接数为止.在返回连接给客户程序之前,它能够验证连接的有效性.
+ */
+ class DBConnectionPool {
+ private int checkedOut;
+ private Vector freeConnections = new Vector();
+ private int maxConn;
+ private String name;
+ private String password;
+ private String URL;
+ private String user;
+
+ /**
+ * 创建新的连接池
+ *
+ * @param name
+ * 连接池名字
+ * @param URL
+ * 数据库的JDBC URL
+ * @param user
+ * 数据库帐号,或 null
+ * @param password
+ * 密码,或 null
+ * @param maxConn
+ * 此连接池允许建立的最大连接数
+ */
+ public DBConnectionPool(String name, String URL, String user,
+ String password, int maxConn) {
+ this.name = name;
+ this.URL = URL;
+ this.user = user;
+ this.password = password;
+ this.maxConn = maxConn;
+ }
+
+ /**
+ * 将不再使用的连接返回给连接池
+ *
+ * @param con
+ * 客户程序释放的连接
+ */
+ public synchronized void freeConnection(Connection con) {
+ // 将指定连接加入到向量末尾
+ freeConnections.addElement(con);
+ checkedOut--;
+ //release();
+ notifyAll();
+ }
+
+ /**
+ * 从连接池获得一个可用连接.如没有空闲的连接且当前连接数小于最大连接 数限制,则创建新连接.如原来登记为可用的连接不再有效,则从向量删除之,
+ * 然后递归调用自己以尝试新的可用连接.
+ */
+ public synchronized Connection getConnection() throws DaoException{
+ Connection con = null;
+ if (freeConnections.size() > 0) {
+ // 获取向量中第一个可用连接
+ con = (Connection) freeConnections.firstElement();
+ freeConnections.removeElementAt(0);
+ try {
+ if (con.isClosed()) {
+ // 递归调用自己,尝试再次获取可用连接
+ con = getConnection();
+ }
+ } catch (SQLException e) {
+ // 递归调用自己,尝试再次获取可用连接
+ con = getConnection();
+ }
+ } else if (maxConn == 0 || checkedOut < maxConn) {
+ con = newConnection();
+ }
+ if (con != null) {
+ checkedOut++;
+ }
+ return con;
+ }
+
+ /**
+ * 从连接池获取可用连接.可以指定客户程序能够等待的最长时间 参见前一个getConnection()方法.
+ *
+ * @param timeout
+ * 以毫秒计的等待时间限制
+ */
+ public synchronized Connection getConnection(long timeout) throws DaoException {
+ long startTime = new Date().getTime();
+ Connection con;
+ while ((con = getConnection()) == null) {
+ try {
+ wait(timeout);
+ } catch (InterruptedException e) {
+ }
+ if ((new Date().getTime() - startTime) >= timeout) {
+ // wait()返回的原因是超时
+ return null;
+ }
+ }
+ return con;
+ }
+
+ /**
+ * 关闭所有连接
+ */
+ public synchronized void release() {
+ Enumeration allConnections = freeConnections.elements();
+ while (allConnections.hasMoreElements()) {
+ Connection con = (Connection) allConnections.nextElement();
+ try {
+ con.close();
+ } catch (SQLException e) {
+ System.out.println("无法关闭连接池" + name + "中的连接,错误信息:"
+ + e.getMessage());
+ }
+ }
+ freeConnections.removeAllElements();
+ }
+
+ /**
+ * 创建新的连接
+ */
+ private Connection newConnection() throws DaoException {
+ Connection con = null;
+ try {
+ if (user == null) {
+ con = DriverManager.getConnection(URL);
+ } else {
+ con = DriverManager.getConnection(URL, user, password);
+ }
+ System.out.println("连接池" + name + "创建一个新的连接");
+ } catch (SQLException e) {
+ throw new DaoException("无法创建下列URL的连接:" + URL + "\n错误信息:"+ e.getMessage());
+ }
+ //freeConnections.addElement(con);
+ return con;
+ }
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/dao/JdbcPzConnectionManager.java b/src/main/java/cn/ac/iie/dao/JdbcPzConnectionManager.java
new file mode 100644
index 0000000..f35d5d3
--- /dev/null
+++ b/src/main/java/cn/ac/iie/dao/JdbcPzConnectionManager.java
@@ -0,0 +1,392 @@
+package cn.ac.iie.dao;
+
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.Hashtable;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.Vector;
+
+import com.nis.exception.DaoException;
+
+
+
+
+
+/**
+ *
+ * <p>JDBC 连接管理类</p>
+ * <p>使用该类需要具备两个条件:
+ * <li>1.数据库驱动jar包</li>
+ * <li>2.在src目录下生成db.properties,具有如下内容
+ * drivers=* 数据库连接驱动类
+ * idb.url=* 数据库连接URL
+ * idb.maxconn=* 线程最大连接数
+ * idb.user=* 数据库连接用户名
+ * idb.password=* 数据库连接密码
+ * </li>
+ * </p>
+ * @author 中科智源育成信息有限公司 E-mail: [email protected]
+ * @version 1.0 创建时间:2010-11-8 下午04:54:15
+ *
+ */
+public final class JdbcPzConnectionManager {
+ private static final JdbcPzConnectionManager jdbcPzConnectionManager = new JdbcPzConnectionManager() ; // 唯一实例
+ private static int clients;
+ private Vector drivers = new Vector();
+ private Hashtable pools = new Hashtable();
+
+ //抑制默认的构造器,避免实例化对象
+ private JdbcPzConnectionManager(){
+ init();
+ }
+
+ /**
+ * 读取属性完成初始化
+ */
+ private void init() {
+
+ //InputStream is = getClass().getResourceAsStream("/db.properties");
+
+ Properties dbProps = new Properties();
+ try {
+ InputStream is = JdbcPzConnectionManager.class.getClassLoader().getResourceAsStream("db_pz.properties");
+ //InputStream is = new FileInputStream(System.getProperty("user.dir")+File.separator+"config"+File.separator+"db.properties");
+ dbProps.load(is);
+ System.out.println("db_pz.properties读取数据成功!");
+ } catch (Exception e) {
+ System.err.println("不能读取属性文件. "
+ + "请确保db_pz.properties在CLASSPATH指定的路径中");
+ throw new RuntimeException(new FileNotFoundException("unknow db_pz.properties"));
+ }
+ loadDrivers(dbProps);
+ createPools(dbProps);
+ }
+
+ /**
+ * 装载和注册所有JDBC驱动程序
+ *
+ * @param props
+ * 属性
+ */
+ private void loadDrivers(Properties props) {
+ String driverClasses = props.getProperty("drivers");
+ StringTokenizer st = new StringTokenizer(driverClasses);
+ while (st.hasMoreElements()) {
+ String driverClassName = st.nextToken().trim();
+
+ try {
+
+ Driver driver = (Driver) Class.forName(driverClassName)
+ .newInstance();
+ DriverManager.registerDriver(driver);
+ drivers.addElement(driver);
+ } catch (Exception e) {
+ System.out.println("无法装载驱动,异常信息:" + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * 根据指定属性创建连接池实例.
+ *
+ * @param props
+ * 连接池属性
+ */
+ private void createPools(Properties props) {
+ Enumeration propNames = props.propertyNames();
+ while (propNames.hasMoreElements()) {
+ String name = (String) propNames.nextElement();
+ if (name.endsWith(".url")) {
+ String poolName = name.substring(0, name.lastIndexOf("."));
+ String url = props.getProperty(poolName + ".url");
+ if (url == null) {
+ System.out.println("没有为连接池" + poolName + "指定URL");
+ continue;
+ }
+ String user = props.getProperty(poolName + ".user");
+ String password = props.getProperty(poolName + ".password");
+ String maxconn = props.getProperty(poolName + ".maxconn", "0");
+
+ int max;
+ try {
+ max = Integer.valueOf(maxconn).intValue();
+ } catch (NumberFormatException e) {
+ System.out.println("错误的最大连接数限制: " + maxconn + " .连接池: "
+ + poolName);
+ max = 0;
+ }
+ DBConnectionPool pool = new DBConnectionPool(poolName, url,
+ user, password, max);
+ pools.put(poolName, pool);
+
+ }
+ }
+ }
+
+
+ /**
+ * 返回singleton实例.如果是第一次调用此方法,则创建实例
+ *
+ * @return JdbcConnectionManager 唯一实例
+ */
+ public static synchronized JdbcPzConnectionManager getInstance() {
+ clients++;
+ return jdbcPzConnectionManager;
+ }
+
+
+
+ /**
+ * 将连接对象返回给由名字指定的连接池
+ *
+ * @param name
+ * 在属性文件中定义的连接池名字
+ * @param conn
+ * 连接对象
+ */
+ public void freeConnection(String name, Connection conn) {
+ DBConnectionPool pool = (DBConnectionPool) pools.get(name);
+ if (pool != null) {
+ pool.freeConnection(conn);
+ }
+ }
+
+ /**
+ * 获得一个可用的(空闲的)连接.如果没有可用连接,且已有连接数小于最大连接数 限制,则创建并返回新连接
+ *
+ * @param name
+ * 在属性文件中定义的连接池名字
+ * @return Connection 可用连接或null
+ */
+ public Connection getConnection(String name) throws DaoException{
+ DBConnectionPool pool = (DBConnectionPool) pools.get(name);
+ if (pool != null) {
+ return pool.getConnection();
+ }
+ return null;
+ }
+
+ /**
+ * 获得一个可用连接.若没有可用连接,且已有连接数小于最大连接数限制, 则创建并返回新连接.否则,在指定的时间内等待其它线程释放连接.
+ *
+ * @param name
+ * 连接池名字
+ * @param time
+ * 以毫秒计的等待时间
+ * @return Connection 可用连接或null
+ */
+ public Connection getConnection(String name, long time)throws DaoException {
+ DBConnectionPool pool = (DBConnectionPool) pools.get(name);
+ if (pool != null) {
+ return pool.getConnection(time);
+ }
+ return null;
+ }
+
+ //返回多少个连接
+ private int getClient() {
+ return clients;
+ }
+
+ /**
+ * 关闭所有连接,撤销驱动程序的注册
+ */
+ public synchronized void release() {
+ // 等待直到最后一个客户程序调用
+ if (--clients != 0) {
+ return;
+ }
+
+ Enumeration allPools = pools.elements();
+ while (allPools.hasMoreElements()) {
+ DBConnectionPool pool = (DBConnectionPool) allPools.nextElement();
+ pool.release();
+ }
+
+ Enumeration allDrivers = drivers.elements();
+ while (allDrivers.hasMoreElements()) {
+ Driver driver = (Driver) allDrivers.nextElement();
+ try {
+ DriverManager.deregisterDriver(driver);
+ // System.out.println("撤销JDBC驱动程序 " + driver.getClass().getName()
+ // + "的注册");
+ } catch (SQLException e) {
+ System.out.println("无法撤销下列JDBC驱动程序的注册: "
+ + driver.getClass().getName() + "。错误信息:"
+ + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ *
+ * 清空PreparedStatement、Statement、ResultSet对象,未定义的置空。
+ * @param pstmt PreparedStatement对象
+ * @param stmt Statement对象
+ * @param rs ResultSet对象
+ */
+ public void clear(PreparedStatement pstmt,Statement stmt,ResultSet rs) {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ if (rs != null) {
+ rs.close();
+ }
+ if (pstmt != null) {
+ pstmt.close();
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+
+ /**
+ * 此内部类定义了一个连接池.它能够根据要求创建新连接,直到预定的最大连接数为止.在返回连接给客户程序之前,它能够验证连接的有效性.
+ */
+ class DBConnectionPool {
+ private int checkedOut;
+ private Vector freeConnections = new Vector();
+ private int maxConn;
+ private String name;
+ private String password;
+ private String URL;
+ private String user;
+
+ /**
+ * 创建新的连接池
+ *
+ * @param name
+ * 连接池名字
+ * @param URL
+ * 数据库的JDBC URL
+ * @param user
+ * 数据库帐号,或 null
+ * @param password
+ * 密码,或 null
+ * @param maxConn
+ * 此连接池允许建立的最大连接数
+ */
+ public DBConnectionPool(String name, String URL, String user,
+ String password, int maxConn) {
+ this.name = name;
+ this.URL = URL;
+ this.user = user;
+ this.password = password;
+ this.maxConn = maxConn;
+ }
+
+ /**
+ * 将不再使用的连接返回给连接池
+ *
+ * @param con
+ * 客户程序释放的连接
+ */
+ public synchronized void freeConnection(Connection con) {
+ // 将指定连接加入到向量末尾
+ freeConnections.addElement(con);
+ checkedOut--;
+ //release();
+ notifyAll();
+ }
+
+ /**
+ * 从连接池获得一个可用连接.如没有空闲的连接且当前连接数小于最大连接 数限制,则创建新连接.如原来登记为可用的连接不再有效,则从向量删除之,
+ * 然后递归调用自己以尝试新的可用连接.
+ */
+ public synchronized Connection getConnection() throws DaoException{
+ Connection con = null;
+ if (freeConnections.size() > 0) {
+ // 获取向量中第一个可用连接
+ con = (Connection) freeConnections.firstElement();
+ freeConnections.removeElementAt(0);
+ try {
+ if (con.isClosed()) {
+ // 递归调用自己,尝试再次获取可用连接
+ con = getConnection();
+ }
+ } catch (SQLException e) {
+ // 递归调用自己,尝试再次获取可用连接
+ con = getConnection();
+ }
+ } else if (maxConn == 0 || checkedOut < maxConn) {
+ con = newConnection();
+ }
+ if (con != null) {
+ checkedOut++;
+ }
+ return con;
+ }
+
+ /**
+ * 从连接池获取可用连接.可以指定客户程序能够等待的最长时间 参见前一个getConnection()方法.
+ *
+ * @param timeout
+ * 以毫秒计的等待时间限制
+ */
+ public synchronized Connection getConnection(long timeout) throws DaoException {
+ long startTime = new Date().getTime();
+ Connection con;
+ while ((con = getConnection()) == null) {
+ try {
+ wait(timeout);
+ } catch (InterruptedException e) {
+ }
+ if ((new Date().getTime() - startTime) >= timeout) {
+ // wait()返回的原因是超时
+ return null;
+ }
+ }
+ return con;
+ }
+
+ /**
+ * 关闭所有连接
+ */
+ public synchronized void release() {
+ Enumeration allConnections = freeConnections.elements();
+ while (allConnections.hasMoreElements()) {
+ Connection con = (Connection) allConnections.nextElement();
+ try {
+ con.close();
+ } catch (SQLException e) {
+ System.out.println("无法关闭连接池" + name + "中的连接,错误信息:"
+ + e.getMessage());
+ }
+ }
+ freeConnections.removeAllElements();
+ }
+
+ /**
+ * 创建新的连接
+ */
+ private Connection newConnection() throws DaoException {
+ Connection con = null;
+ try {
+ if (user == null) {
+ con = DriverManager.getConnection(URL);
+ } else {
+ con = DriverManager.getConnection(URL, user, password);
+ }
+ System.out.println("连接池" + name + "创建一个新的连接");
+ } catch (SQLException e) {
+ throw new DaoException("无法创建下列URL的连接:" + URL + "\n错误信息:"+ e.getMessage());
+ }
+ //freeConnections.addElement(con);
+ return con;
+ }
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/dao/KafkaDB.java b/src/main/java/cn/ac/iie/dao/KafkaDB.java
new file mode 100644
index 0000000..4d15449
--- /dev/null
+++ b/src/main/java/cn/ac/iie/dao/KafkaDB.java
@@ -0,0 +1,81 @@
+package cn.ac.iie.dao;
+
+import cn.ac.iie.bean.voipSipFromToLog.RouteRelationLog;
+import cn.ac.iie.bean.voipSipOrigin.SipOriginALL;
+import cn.ac.iie.common.RealtimeCountConfig;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.log4j.Logger;
+
+import java.text.SimpleDateFormat;
+import java.util.LinkedList;
+import java.util.Properties;
+
+/**
+ * 发送数据的kafka工具类
+ */
+public class KafkaDB {
+ private static Logger logger = Logger.getLogger(KafkaDB.class);
+
+ private static Producer<String, String> producer;
+
+ private static KafkaDB kafkaDB;
+
+ private KafkaDB() {
+ getProducer();
+ }
+
+ public static KafkaDB getInstance() {
+ if (kafkaDB == null) {
+ kafkaDB = new KafkaDB();
+ }
+ return kafkaDB;
+ }
+
+ public void siporiLog2KafkaFromSipInsertBoltDC(LinkedList<String> sipOriJsonS) {
+ long time = System.currentTimeMillis() / 1000L;
+ for (String sipOriJson : sipOriJsonS) {
+ try {
+ SipOriginALL sipOriginLog = JSONObject.parseObject(sipOriJson, SipOriginALL.class);
+ sipOriginLog.setStat_time(time);
+ producer.send(new ProducerRecord<>(RealtimeCountConfig.KAFKA_SIP_COMPLEMENT_TOPIC, JSONObject.toJSONString(sipOriginLog)));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ producer.flush();
+ }
+
+ public void routeRelatLog2KafkaFromSipInsertBoltDC(LinkedList<String> routeJsonS) {
+ long time = System.currentTimeMillis() / 1000L;
+ for (String routeJson : routeJsonS) {
+ try {
+ RouteRelationLog routeRelationLog = JSONObject.parseObject(routeJson, RouteRelationLog.class);
+ routeRelationLog.setTimestamp(time);
+ producer.send(new ProducerRecord<>(RealtimeCountConfig.KAFKA_ROUTE_RELATION_TOPIC, JSONObject.toJSONString(routeRelationLog)));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ producer.flush();
+ }
+
+ /**
+ * 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次
+ */
+ private void getProducer() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", RealtimeCountConfig.BOOTSTRAP_OUTPUT_SERVERS);
+ properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("acks", "1");
+ properties.put("linger.ms", "2");
+ properties.put("request.timeout.ms", 20000);
+ properties.put("batch.size", 262144);
+ properties.put("buffer.memory", 33554432);
+ producer = new KafkaProducer<String, String>(properties);
+ }
+
+}