diff options
Diffstat (limited to 'src/main/java/cn/ac/iie/dao')
| -rw-r--r-- | src/main/java/cn/ac/iie/dao/DataBaseBusiness.java | 1604 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/dao/DataBaseLoad.java | 166 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/dao/DataBasePzBusiness.java | 122 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/dao/DbConnect.java | 102 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/dao/JdbcConnectionManager.java | 392 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/dao/JdbcPzConnectionManager.java | 392 | ||||
| -rw-r--r-- | src/main/java/cn/ac/iie/dao/KafkaDB.java | 81 |
7 files changed, 2859 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/dao/DataBaseBusiness.java b/src/main/java/cn/ac/iie/dao/DataBaseBusiness.java new file mode 100644 index 0000000..9fae542 --- /dev/null +++ b/src/main/java/cn/ac/iie/dao/DataBaseBusiness.java @@ -0,0 +1,1604 @@ +package cn.ac.iie.dao; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; +import org.apache.log4j.Logger; +import cn.ac.iie.common.RealtimeCountConfig; + + +public final class DataBaseBusiness { + private static final JdbcConnectionManager manager = JdbcConnectionManager.getInstance(); + private static final Logger logger = Logger.getLogger(DataBaseBusiness.class); + private Connection connection; + private PreparedStatement pstmt; + + public DataBaseBusiness(){ + + } + + private Long generateTimeWithInterval() throws ParseException{ + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String dt = df.format(new Date()); + System.out.println(dt); + + String[] s = dt.split(":"); + Integer i = Integer.valueOf(s[1]); + String ret = ""; + if(i<5){ + ret = s[0]+":00:00"; + } else if(i<10){ + ret = s[0]+":05:00"; + } else if(i<15){ + ret = s[0]+":10:00"; + } else if(i<20){ + ret = s[0]+":15:00"; + } else if(i<25){ + ret = s[0]+":20:00"; + } else if(i<30){ + ret = s[0]+":25:00"; + } else if(i<35){ + ret = s[0]+":30:00"; + } else if(i<40){ + ret = s[0]+":35:00"; + } else if(i<45){ + ret = s[0]+":40:00"; + } else if(i<50){ + ret = s[0]+":45:00"; + } else if(i<55){ + ret = s[0]+":50:00"; + } else if(i<=59){ + ret = s[0]+":55:00"; + } + + Date date=df.parse(ret); + Long l = date.getTime()+300000; + return l; + } + + @SuppressWarnings("unused") + private void setBigDecimalNullable(PreparedStatement pstmts, int index, String str) throws Exception { + if (str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()) { + pstmts.setNull(index, Types.BIGINT); + } else { + pstmts.setBigDecimal(index, new BigDecimal(str)); + } + } + + @SuppressWarnings("unused") + private void setBigDecimal(PreparedStatement pstmts, int index, String str) throws Exception { + pstmts.setBigDecimal(index, new BigDecimal(str)); + } + + @SuppressWarnings("unused") + private void setLongNullable(PreparedStatement pstmts, int index, String str) throws Exception { + if (str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()) { + pstmts.setNull(index, Types.BIGINT); + } else { + pstmts.setLong(index, Long.parseLong(str)); + } + } + + private void setLong(PreparedStatement pstmts, int index, String str) throws Exception { + pstmts.setLong(index, Long.parseLong(str)); + } + + @SuppressWarnings("unused") + private void setIntNullable(PreparedStatement pstmts, int index, String str) throws Exception { + if (str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()) { + pstmts.setNull(index, Types.INTEGER); + } else { + pstmts.setInt(index, Integer.parseInt(str)); + } + } + + private void setInt(PreparedStatement pstmts, int index, String str) throws Exception { + pstmts.setInt(index, Integer.parseInt(str)); + } + + private void setStringNullable(PreparedStatement pstmts, int index, String str, Integer maxLen) throws Exception { + if(str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()){ + pstmts.setString(index, ""); + } else { + int byteLen = str.getBytes("utf-8").length; + if(byteLen>maxLen){ + pstmts.setString(index, str.substring(0, str.length()-(byteLen-maxLen))); + } else { + pstmts.setString(index, str); + } + } + } + + private void setString(PreparedStatement pstmts, int index, String str, Integer maxLen) throws Exception { + if(str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()){ + throw new SQLException("字符串为空,如字符串可空,请使用setStringNullable函数"); + } else { + int byteLen = str.getBytes("utf-8").length; + if(byteLen>maxLen){ + pstmts.setString(index, str.substring(0, str.length()-(byteLen-maxLen))); + } else { + pstmts.setString(index, str); + } + } + } + + @SuppressWarnings("unused") + private void setTimeStampNullable(PreparedStatement pstmts, int index, String str) throws Exception { + if (str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()) { + pstmts.setNull(index, Types.TIMESTAMP); + } else { + pstmts.setTimestamp(index, new Timestamp(Long.parseLong(str + "000"))); + } + } + + @SuppressWarnings("unused") + private void setTimeStamp(PreparedStatement pstmts, int index, String str) throws Exception { + if(str.equals(RealtimeCountConfig.EMPTY_OPTION_CHARACTER) || str.isEmpty()){ + throw new SQLException("时间为空,如字符串可空,请使用setTimeStampNullable函数"); + } else { + pstmts.setTimestamp(index, new Timestamp(Long.parseLong(str + "000"))); + } + } + + private void generateTimeStamp(PreparedStatement pstmts, int index) throws Exception { + pstmts.setTimestamp(index, new Timestamp(generateTimeWithInterval())); + } + + public void dfPzBatchStorage(Map<String, Long> pzMap) { + String sql = " insert into DF_PZ_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, SERVICE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_PZ_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : pzMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setLong(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + pstmt.setLong(4, pzMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + + public void dfServiceBatchStorage(Map<String, Long> serviceMap) { + String sql = " insert into DF_SERVICE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_SERVICE_REPORT.NEXTVAL, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : serviceMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + pstmt.setLong(3, serviceMap.get(key)); + generateTimeStamp(pstmt, 4); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + + public void dfTagBatchStorage(Map<String, Long> tagMap) { + String sql = " insert into DF_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, TAG, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : tagMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + pstmt.setLong(4, tagMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + + public void dfSrcipDomesticBatchStorage(Map<String, Long> srcipMap) { + String sql = " insert into DF_SRCIP_DOMESTIC_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, SRC_CITY, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_SRCIP_DOMESTIC_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : srcipMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setString(pstmt, 3, options[3], 256); + setStringNullable(pstmt, 4, options[4], 256); + pstmt.setLong(5, srcipMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + + public void dfDestipCountryBatchStorage(Map<String, Long> srcipMap) { + String sql = " insert into DF_DESTIP_COUNTRY_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, DEST_COUNTRY, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_DESTIP_COUNTRY_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : srcipMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setString(pstmt, 3, options[3], 256); + pstmt.setLong(4, srcipMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + + public void djPzBatchStorage(Map<String, Long> pzMap) { + String sql = " insert into DJ_PZ_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, SERVICE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_PZ_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : pzMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setLong(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + pstmt.setLong(4, pzMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void dfAttrTypeBatchStorage(Map<String, Long> attrTypeMap) { + String sql = " insert into DF_ATTR_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, ATTR_TYPE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_ATTR_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : attrTypeMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + pstmt.setLong(4, attrTypeMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void dfEntranceBatchStorage(Map<String, Long> entranceMap) { + String sql = " insert into DF_ENTRANCE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, ENTRANCE_ID, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_ENTRANCE_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : entranceMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setLong(pstmt, 3, options[3]); + pstmt.setLong(4, entranceMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void dfLwhhBatchStorage(Map<String, Long> lwhhMap) { + String sql = " insert into DF_LWHH_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_LWHH_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : lwhhMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + pstmt.setLong(4, lwhhMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void dfLwhhTypeBatchStorage(Map<String, Long> lwhhTypeMap) { + String sql = " insert into DF_LWHH_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, ATTR_TYPE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_LWHH_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : lwhhTypeMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, lwhhTypeMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void dfLwhhTagBatchStorage(Map<String, Long> lwhhTagMap) { + String sql = " insert into DF_LWHH_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, TAG, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_LWHH_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : lwhhTagMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, lwhhTagMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void dfSrcipTypeBatchStorage(Map<String, Long> srcTypeMap) { + String sql = " insert into DF_SRCIP_DOMESTIC_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, ATTR_TYPE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_SRCIP_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : srcTypeMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setString(pstmt, 3, options[3], 256); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, srcTypeMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void dfSrcipTagBatchStorage(Map<String, Long> srcTagMap) { + String sql = " insert into DF_SRCIP_DOMESTIC_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, TAG, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_SRCIP_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : srcTagMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setString(pstmt, 3, options[3], 256); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, srcTagMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void dfPzAttrBatchStorage(Map<String, Long> pzAttrMap) { + String sql = " insert into DF_PZ_ATTR_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, ATTR_TYPE, SERVICE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_PZ_ATTR_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : pzAttrMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setLong(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, pzAttrMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void dfPzTagBatchStorage(Map<String, Long> pzTagMap) { + String sql = " insert into DF_PZ_TAG_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, TAG, SERVICE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DF_PZ_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : pzTagMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setLong(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, pzTagMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void djAttrTypeBatchStorage(Map<String, Long> attrTypeMap) { + String sql = " insert into DJ_ATTR_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, ATTR_TYPE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_ATTR_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : attrTypeMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + pstmt.setLong(4, attrTypeMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void djDestipCountryBatchStorage(Map<String, Long> srcipMap) { + String sql = " insert into DJ_DESTIP_COUNTRY_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, DEST_COUNTRY, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_DESTIP_COUNTRY_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : srcipMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setString(pstmt, 3, options[3], 256); + pstmt.setLong(4, srcipMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + + public void djEntranceBatchStorage(Map<String, Long> entranceMap) { + String sql = " insert into DJ_ENTRANCE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, ENTRANCE_ID, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_ENTRANCE_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : entranceMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setLong(pstmt, 3, options[3]); + pstmt.setLong(4, entranceMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + + + public void djLwhhBatchStorage(Map<String, Long> lwhhMap) { + String sql = " insert into DJ_LWHH_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_LWHH_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : lwhhMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + pstmt.setLong(4, lwhhMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + + public void djServiceBatchStorage(Map<String, Long> serviceMap) { + String sql = " insert into DJ_SERVICE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_SERVICE_REPORT.NEXTVAL, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : serviceMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + pstmt.setLong(3, serviceMap.get(key)); + generateTimeStamp(pstmt, 4); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + + public void djSrcipDomesticBatchStorage(Map<String, Long> srcipMap) { + String sql = " insert into DJ_SRCIP_DOMESTIC_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, SRC_CITY, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_SRCIP_DOMESTIC_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : srcipMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setString(pstmt, 3, options[3], 256); + setStringNullable(pstmt, 4, options[4], 256); + pstmt.setLong(5, srcipMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void djTagBatchStorage(Map<String, Long> tagMap) { + String sql = " insert into DJ_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, TAG, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : tagMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + pstmt.setLong(4, tagMap.get(key)); + generateTimeStamp(pstmt, 5); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + + public void djLwhhTypeBatchStorage(Map<String, Long> lwhhTypeMap) { + String sql = " insert into DJ_LWHH_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, ATTR_TYPE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_LWHH_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : lwhhTypeMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, lwhhTypeMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void djLwhhTagBatchStorage(Map<String, Long> lwhhTagMap) { + String sql = " insert into DJ_LWHH_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, LWHH, TAG, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_LWHH_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : lwhhTagMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, lwhhTagMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void djSrcipTypeBatchStorage(Map<String, Long> srcTypeMap) { + String sql = " insert into DJ_SRCIP_DOMESTIC_TYPE_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, ATTR_TYPE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_SRCIP_TYPE_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : srcTypeMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setString(pstmt, 3, options[3], 256); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, srcTypeMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void djSrcipTagBatchStorage(Map<String, Long> srcTagMap) { + String sql = " insert into DJ_SRCIP_DOMESTIC_TAG_REPORT(STAT_ID, ACTIVE_SYS, SERVICE, SRC_PROVINCE, TAG, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_SRCIP_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : srcTagMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setInt(pstmt, 2, options[2]); + setString(pstmt, 3, options[3], 256); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, srcTagMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void djPzAttrBatchStorage(Map<String, Long> pzAttrMap) { + String sql = " insert into DJ_PZ_ATTR_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, ATTR_TYPE, SERVICE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_PZ_ATTR_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : pzAttrMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setLong(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, pzAttrMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } + + public void djPzTagBatchStorage(Map<String, Long> pzTagMap) { + String sql = " insert into DJ_PZ_TAG_REPORT(STAT_ID, ACTIVE_SYS, CFG_ID, TAG, SERVICE, SUM, REPORT_TIME) " + + " VALUES(SEQ_DJ_PZ_TAG_REPORT.NEXTVAL, ?, ?, ?, ?, ?, ?)"; + + try { + connection = manager.getConnection("idb"); + connection.setAutoCommit(false); + pstmt = connection.prepareStatement(sql); + } catch (Exception e) { + e.printStackTrace(); + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + return; + } + int nums = 0; + for (String key : pzTagMap.keySet()) { + try { + String[] options = key.split(RealtimeCountConfig.BETWEEN_BOLTS_SPLITTER); + + setInt(pstmt, 1, options[1]); + setLong(pstmt, 2, options[2]); + setInt(pstmt, 3, options[3]); + setInt(pstmt, 4, options[4]); + pstmt.setLong(5, pzTagMap.get(key)); + generateTimeStamp(pstmt, 6); + + pstmt.addBatch(); + nums++; + if(nums % RealtimeCountConfig.BATCH_INSERT_NUM == 0) { + pstmt.executeBatch(); + connection.commit(); + } + } catch (Exception e) { + logger.error("日志存在非法字段:"+key); + e.printStackTrace(); + } + } + try { + pstmt.executeBatch(); + connection.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + manager.clear(pstmt, null, null); + if(connection!=null){ + manager.freeConnection("idb", connection); + } + } + } +} diff --git a/src/main/java/cn/ac/iie/dao/DataBaseLoad.java b/src/main/java/cn/ac/iie/dao/DataBaseLoad.java new file mode 100644 index 0000000..05b88b3 --- /dev/null +++ b/src/main/java/cn/ac/iie/dao/DataBaseLoad.java @@ -0,0 +1,166 @@ +package cn.ac.iie.dao; + + +import cn.ac.iie.bean.ntc.NTC_CONN_RECORD_LOG; +import cn.ac.iie.common.RealtimeCountConfig; +import com.alibaba.fastjson.JSONObject; +import org.apache.log4j.Logger; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.text.SimpleDateFormat; +import java.util.LinkedList; +import java.util.Map; + + +/** + * 日志写入clickHouse insert类 + * + * @author Administrator + * @create 2018-10-31 12:35 + */ +public class DataBaseLoad { + private static final Logger logger = Logger.getLogger(DataBaseLoad.class); + private static DbConnect manger = DbConnect.getInstance(); + private Connection connection; + private PreparedStatement pstm; + + public DataBaseLoad() { + } + + public void ntcKilledBatchStorage2CH(LinkedList<String> tmpList) { + String tableName = RealtimeCountConfig.TABLE_KILLED_NAME; + String sql = "INSERT INTO " + tableName + " VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + try { + connection = manger.getConnection(); + connection.setAutoCommit(false); + pstm = connection.prepareStatement(sql); + int nums = 0; + for (String ntcLog : tmpList) { + NTC_CONN_RECORD_LOG ntcConnRecordLog = JSONObject.parseObject(ntcLog, NTC_CONN_RECORD_LOG.class); + pstm.setInt(1, ntcConnRecordLog.getCfg_id()); + pstm.setInt(2, ntcConnRecordLog.getFound_time()); + pstm.setInt(3, ntcConnRecordLog.getRecv_time()); + pstm.setLong(4, ntcConnRecordLog.getOver_id()); + pstm.setString(5, ntcConnRecordLog.getTrans_proto()); + pstm.setString(6, ntcConnRecordLog.getD_ip()); + pstm.setString(7, ntcConnRecordLog.getS_ip()); + pstm.setInt(8, ntcConnRecordLog.getD_port()); + pstm.setInt(9, ntcConnRecordLog.getS_port()); + pstm.setString(10, ntcConnRecordLog.getNest_protocol()); + pstm.setString(11, ntcConnRecordLog.getNest_server_ip()); + pstm.setString(12, ntcConnRecordLog.getNest_client_ip()); + pstm.setInt(13, ntcConnRecordLog.getNest_server_port()); + pstm.setInt(14, ntcConnRecordLog.getNest_client_port()); + pstm.setInt(15, ntcConnRecordLog.getService()); + pstm.setInt(16, ntcConnRecordLog.getEntrance_id()); + pstm.setString(17, ntcConnRecordLog.getCap_ip()); + pstm.setString(18, ntcConnRecordLog.getScene_file()); + pstm.setString(19, ntcConnRecordLog.getInjected_pkt_file()); + pstm.setString(20, ntcConnRecordLog.getNest_addr_list()); + pstm.setInt(21, ntcConnRecordLog.getAction()); + pstm.setString(22, ntcConnRecordLog.getServer_locate()); + pstm.setString(23, ntcConnRecordLog.getClient_locate()); + pstm.setString(24, ntcConnRecordLog.getApp_label()); + pstm.setLong(25, ntcConnRecordLog.getC2s_pkt_num()); + pstm.setLong(26, ntcConnRecordLog.getS2c_pkt_num()); + pstm.setLong(27, ntcConnRecordLog.getC2s_byte_num()); + pstm.setLong(28, ntcConnRecordLog.getS2c_byte_num()); + + pstm.setString(29, ntcConnRecordLog.getUser_region()); + pstm.setInt(30, ntcConnRecordLog.getStream_dir()); + pstm.setString(31, ntcConnRecordLog.getAddr_list()); + pstm.setInt(32, ntcConnRecordLog.getCreate_time()); + pstm.setInt(33, ntcConnRecordLog.getLastmtime()); + + pstm.addBatch(); + nums++; + if (nums >= RealtimeCountConfig.BATCH_CHINSERT_KILLED_NUM) { + pstm.executeBatch(); + connection.commit(); + nums = 0; + } + } + if (nums != 0) { + pstm.executeBatch(); + connection.commit(); + nums = 0; + } + } catch (Exception e) { + logger.error("+++++++++insert to " + RealtimeCountConfig.TABLE_KILLED_NAME + " Log write failed!!!+++++++++"); + e.printStackTrace(); + } finally { + manger.clear(pstm, connection); + } + } + + + + + public void dfPzFlowBatchStorage2CH(LinkedList<String> tmpList) { + String tableName = RealtimeCountConfig.TABLE_NAME; + String sql = "INSERT INTO " + tableName + " VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + try { + connection = manger.getConnection(); + connection.setAutoCommit(false); + pstm = connection.prepareStatement(sql); + int nums = 0; + for (String ntcLog : tmpList) { + NTC_CONN_RECORD_LOG ntcConnRecordLog = JSONObject.parseObject(ntcLog, NTC_CONN_RECORD_LOG.class); + pstm.setInt(1, ntcConnRecordLog.getCfg_id()); + pstm.setInt(2, ntcConnRecordLog.getFound_time()); + pstm.setInt(3, ntcConnRecordLog.getRecv_time()); + pstm.setLong(4, ntcConnRecordLog.getOver_id()); + pstm.setString(5, ntcConnRecordLog.getTrans_proto()); + pstm.setString(6, ntcConnRecordLog.getD_ip()); + pstm.setString(7, ntcConnRecordLog.getS_ip()); + pstm.setInt(8, ntcConnRecordLog.getD_port()); + pstm.setInt(9, ntcConnRecordLog.getS_port()); + pstm.setString(10, ntcConnRecordLog.getNest_protocol()); + pstm.setString(11, ntcConnRecordLog.getNest_server_ip()); + pstm.setString(12, ntcConnRecordLog.getNest_client_ip()); + pstm.setInt(13, ntcConnRecordLog.getNest_server_port()); + pstm.setInt(14, ntcConnRecordLog.getNest_client_port()); + pstm.setInt(15, ntcConnRecordLog.getService()); + pstm.setInt(16, ntcConnRecordLog.getEntrance_id()); + pstm.setString(17, ntcConnRecordLog.getCap_ip()); + pstm.setString(18, ntcConnRecordLog.getScene_file()); + pstm.setString(19, ntcConnRecordLog.getInjected_pkt_file()); + pstm.setString(20, ntcConnRecordLog.getNest_addr_list()); + pstm.setInt(21, ntcConnRecordLog.getAction()); + pstm.setString(22, ntcConnRecordLog.getServer_locate()); + pstm.setString(23, ntcConnRecordLog.getClient_locate()); + pstm.setString(24, ntcConnRecordLog.getApp_label()); + pstm.setLong(25, ntcConnRecordLog.getC2s_pkt_num()); + pstm.setLong(26, ntcConnRecordLog.getS2c_pkt_num()); + pstm.setLong(27, ntcConnRecordLog.getC2s_byte_num()); + pstm.setLong(28, ntcConnRecordLog.getS2c_byte_num()); + + pstm.setString(29, ntcConnRecordLog.getUser_region()); + pstm.setInt(30, ntcConnRecordLog.getStream_dir()); + pstm.setString(31, ntcConnRecordLog.getAddr_list()); + pstm.setInt(32, ntcConnRecordLog.getCreate_time()); + pstm.setInt(33, ntcConnRecordLog.getLastmtime()); + + pstm.addBatch(); + nums++; + if (nums >= RealtimeCountConfig.BATCH_CHINSERT_NUM) { + pstm.executeBatch(); + connection.commit(); + nums = 0; + } + } + if (nums != 0) { + pstm.executeBatch(); + connection.commit(); + nums = 0; + } + } catch (Exception e) { + logger.error("+++++++++insert to " + RealtimeCountConfig.TABLE_NAME + " Log write failed!!!+++++++++"); + e.printStackTrace(); + } finally { + manger.clear(pstm, connection); + } + } + +} diff --git a/src/main/java/cn/ac/iie/dao/DataBasePzBusiness.java b/src/main/java/cn/ac/iie/dao/DataBasePzBusiness.java new file mode 100644 index 0000000..fd19201 --- /dev/null +++ b/src/main/java/cn/ac/iie/dao/DataBasePzBusiness.java @@ -0,0 +1,122 @@ +package cn.ac.iie.dao; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import org.apache.log4j.Logger; + +import cn.ac.iie.bean.PzTable; + +public final class DataBasePzBusiness { + private static final JdbcPzConnectionManager pzManager = JdbcPzConnectionManager.getInstance(); + private static final Logger logger = Logger.getLogger(DataBasePzBusiness.class); + private Connection connection; + private Statement stmt = null; + + public DataBasePzBusiness() { + + } + + public void getPzToMap(Long seq) { + String queryOptions = " COMPILE_ID, CONT_TYPE, ATTR_TYPE, CONT_LABEL, Task_id, Guarantee_ID, AFFAIR_ID, TOPIC_ID, IS_VALID, PROC_SEQ"; + String sql = "select " + queryOptions + " from config_compile where PROC_SEQ > " + seq; + try { + connection = pzManager.getConnection("idb"); + connection.setAutoCommit(false); + stmt = connection.createStatement(); + } catch (Exception e) { + e.printStackTrace(); + pzManager.clear(null, stmt, null); + if (connection != null) { + pzManager.freeConnection("idb", connection); + } + return; + } + try { + ResultSet rs = stmt.executeQuery(sql); + Long seqMax = seq; + while (rs.next()) { + try { + String sCOMPILE_ID = String.valueOf(rs.getLong(1)); + + Object oCONT_TYPE = rs.getObject(2); + if (oCONT_TYPE == null) + oCONT_TYPE = ""; + String sCONT_TYPE = oCONT_TYPE.toString(); + if (sCONT_TYPE == null || sCONT_TYPE.equals("")) + sCONT_TYPE = "-"; + + Object oATTR_TYPE = rs.getObject(3); + if (oATTR_TYPE == null) + oATTR_TYPE = ""; + String sATTR_TYPE = oATTR_TYPE.toString(); + if (sATTR_TYPE == null || sATTR_TYPE.equals("")) + sATTR_TYPE = "-"; + + Object oCONT_LABEL = rs.getObject(4); + if (oCONT_LABEL == null) + oCONT_LABEL = ""; + String sCONT_LABEL = oCONT_LABEL.toString(); + if (sCONT_LABEL == null || sCONT_LABEL.equals("")) + sCONT_LABEL = "-"; + + Object oTask_id = rs.getObject(5); + if (oTask_id == null) + oTask_id = ""; + String sTask_id = oTask_id.toString(); + if (sTask_id == null || sTask_id.equals("")) + sTask_id = "-"; + + Object oGuarantee_ID = rs.getObject(6); + if (oGuarantee_ID == null) + oGuarantee_ID = ""; + String sGuarantee_ID = oGuarantee_ID.toString(); + if (sGuarantee_ID == null || sGuarantee_ID.equals("")) + sGuarantee_ID = "-"; + + Object oAFFAIR_ID = rs.getObject(7); + if (oAFFAIR_ID == null) + oAFFAIR_ID = ""; + String sAFFAIR_ID = oAFFAIR_ID.toString(); + if (sAFFAIR_ID == null || sAFFAIR_ID.equals("")) + sAFFAIR_ID = "-"; + + Object oTOPIC_ID = rs.getObject(8); + if (oTOPIC_ID == null) + oTOPIC_ID = ""; + String sTOPIC_ID = oTOPIC_ID.toString(); + if (sTOPIC_ID == null || sTOPIC_ID.equals("")) + sTOPIC_ID = "-"; + + Integer iIS_VALID = rs.getInt(9); + + Long lPROC_SEQ = rs.getLong(10); + + if (lPROC_SEQ > seqMax) { + seqMax = lPROC_SEQ; + } + + if (PzTable.seq == 0 && iIS_VALID == 0) { + continue; + } + String value = sCOMPILE_ID + "\t" + sCONT_TYPE + "\t" + sATTR_TYPE + "\t" + sCONT_LABEL + "\t" + + sTask_id + "\t" + sGuarantee_ID + "\t" + sAFFAIR_ID + "\t" + sTOPIC_ID + "\t" + iIS_VALID; + logger.info("put in map: "+value); + PzTable.pzMap.put(sCOMPILE_ID, value); + } catch (Exception e) { + e.printStackTrace(); + } + } + + PzTable.seq = seqMax; + logger.info("PZ UPDATE DONE."); + } catch (Exception e) { + e.printStackTrace(); + } finally { + pzManager.clear(null, stmt, null); + if (connection != null) { + pzManager.freeConnection("idb", connection); + } + } + } +} diff --git a/src/main/java/cn/ac/iie/dao/DbConnect.java b/src/main/java/cn/ac/iie/dao/DbConnect.java new file mode 100644 index 0000000..0635e04 --- /dev/null +++ b/src/main/java/cn/ac/iie/dao/DbConnect.java @@ -0,0 +1,102 @@ +package cn.ac.iie.dao; + + +import com.alibaba.druid.pool.DruidDataSource; +import com.alibaba.druid.pool.DruidPooledConnection; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Properties; + +/** + * Druid连接信息 + * + * @author antlee + * @date 2018/8/20 + */ +public class DbConnect { + private static DruidDataSource dataSource = null; + private static DbConnect dbConnect = null; + private static Properties props = new Properties(); + + static { + getDbConnect(); + } + + private static void getDbConnect() { + try { + if (dataSource == null) { + dataSource = new DruidDataSource(); + props.load(DbConnect.class.getClassLoader().getResourceAsStream("clickhouse.properties")); + //设置连接参数 + dataSource.setUrl("jdbc:clickhouse://" + props.getProperty("db.id")); + dataSource.setDriverClassName(props.getProperty("drivers")); + dataSource.setUsername(props.getProperty("mdb.user")); + dataSource.setPassword(props.getProperty("mdb.password")); + //配置初始化大小、最小、最大 + dataSource.setInitialSize(Integer.parseInt(props.getProperty("initialsize"))); + dataSource.setMinIdle(Integer.parseInt(props.getProperty("minidle"))); + dataSource.setMaxActive(Integer.parseInt(props.getProperty("maxactive"))); + //连接泄漏监测 + dataSource.setRemoveAbandoned(true); + dataSource.setRemoveAbandonedTimeout(30); + dataSource.setDefaultAutoCommit(false); + //配置获取连接等待超时的时间 + dataSource.setMaxWait(30000); + //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + dataSource.setTimeBetweenEvictionRunsMillis(2000); + //防止过期 + dataSource.setValidationQuery("SELECT 1"); + dataSource.setTestWhileIdle(true); + dataSource.setTestOnBorrow(true); + dataSource.setKeepAlive(true); + } + } catch (Exception e) { + e.printStackTrace(); + + } + } + + /** + * 数据库连接池单例 + * + * @return dbConnect + */ + public static synchronized DbConnect getInstance() { + if (null == dbConnect) { + dbConnect = new DbConnect(); + } + return dbConnect; + } + + /** + * 返回druid数据库连接 + * + * @return 连接 + * @throws SQLException sql异常 + */ + public DruidPooledConnection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + /** + * 清空PreparedStatement、Connection对象,未定义的置空。 + * + * @param pstmt PreparedStatement对象 + * @param connection Connection对象 + */ + public void clear(PreparedStatement pstmt, Connection connection) { + try { + if (pstmt != null) { + pstmt.close(); + } + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + e.printStackTrace(); + } + + } +}
\ No newline at end of file diff --git a/src/main/java/cn/ac/iie/dao/JdbcConnectionManager.java b/src/main/java/cn/ac/iie/dao/JdbcConnectionManager.java new file mode 100644 index 0000000..ef369f7 --- /dev/null +++ b/src/main/java/cn/ac/iie/dao/JdbcConnectionManager.java @@ -0,0 +1,392 @@ +package cn.ac.iie.dao; + +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Date; +import java.util.Enumeration; +import java.util.Hashtable; +import java.util.Properties; +import java.util.StringTokenizer; +import java.util.Vector; + +import com.nis.exception.DaoException; + + + + + +/** + * + * <p>JDBC 连接管理类</p> + * <p>使用该类需要具备两个条件: + * <li>1.数据库驱动jar包</li> + * <li>2.在src目录下生成db.properties,具有如下内容 + * drivers=* 数据库连接驱动类 + * idb.url=* 数据库连接URL + * idb.maxconn=* 线程最大连接数 + * idb.user=* 数据库连接用户名 + * idb.password=* 数据库连接密码 + * </li> + * </p> + * @author 中科智源育成信息有限公司 E-mail: [email protected] + * @version 1.0 创建时间:2010-11-8 下午04:54:15 + * + */ +public final class JdbcConnectionManager { + private static final JdbcConnectionManager jdbcConnectionManager = new JdbcConnectionManager() ; // 唯一实例 + private static int clients; + private Vector drivers = new Vector(); + private Hashtable pools = new Hashtable(); + + //抑制默认的构造器,避免实例化对象 + private JdbcConnectionManager(){ + init(); + } + + /** + * 读取属性完成初始化 + */ + private void init() { + + //InputStream is = getClass().getResourceAsStream("/db.properties"); + + Properties dbProps = new Properties(); + try { + InputStream is = JdbcConnectionManager.class.getClassLoader().getResourceAsStream("db.properties"); + //InputStream is = new FileInputStream(System.getProperty("user.dir")+File.separator+"config"+File.separator+"db.properties"); + dbProps.load(is); + System.out.println("读取数据成功!"); + } catch (Exception e) { + System.err.println("不能读取属性文件. " + + "请确保db.properties在CLASSPATH指定的路径中"); + throw new RuntimeException(new FileNotFoundException("unknow db.properties")); + } + loadDrivers(dbProps); + createPools(dbProps); + } + + /** + * 装载和注册所有JDBC驱动程序 + * + * @param props + * 属性 + */ + private void loadDrivers(Properties props) { + String driverClasses = props.getProperty("drivers"); + StringTokenizer st = new StringTokenizer(driverClasses); + while (st.hasMoreElements()) { + String driverClassName = st.nextToken().trim(); + + try { + + Driver driver = (Driver) Class.forName(driverClassName) + .newInstance(); + DriverManager.registerDriver(driver); + drivers.addElement(driver); + } catch (Exception e) { + System.out.println("无法装载驱动,异常信息:" + e.getMessage()); + } + } + } + + /** + * 根据指定属性创建连接池实例. + * + * @param props + * 连接池属性 + */ + private void createPools(Properties props) { + Enumeration propNames = props.propertyNames(); + while (propNames.hasMoreElements()) { + String name = (String) propNames.nextElement(); + if (name.endsWith(".url")) { + String poolName = name.substring(0, name.lastIndexOf(".")); + String url = props.getProperty(poolName + ".url"); + if (url == null) { + System.out.println("没有为连接池" + poolName + "指定URL"); + continue; + } + String user = props.getProperty(poolName + ".user"); + String password = props.getProperty(poolName + ".password"); + String maxconn = props.getProperty(poolName + ".maxconn", "0"); + + int max; + try { + max = Integer.valueOf(maxconn).intValue(); + } catch (NumberFormatException e) { + System.out.println("错误的最大连接数限制: " + maxconn + " .连接池: " + + poolName); + max = 0; + } + DBConnectionPool pool = new DBConnectionPool(poolName, url, + user, password, max); + pools.put(poolName, pool); + + } + } + } + + + /** + * 返回singleton实例.如果是第一次调用此方法,则创建实例 + * + * @return JdbcConnectionManager 唯一实例 + */ + public static synchronized JdbcConnectionManager getInstance() { + clients++; + return jdbcConnectionManager; + } + + + + /** + * 将连接对象返回给由名字指定的连接池 + * + * @param name + * 在属性文件中定义的连接池名字 + * @param conn + * 连接对象 + */ + public void freeConnection(String name, Connection conn) { + DBConnectionPool pool = (DBConnectionPool) pools.get(name); + if (pool != null) { + pool.freeConnection(conn); + } + } + + /** + * 获得一个可用的(空闲的)连接.如果没有可用连接,且已有连接数小于最大连接数 限制,则创建并返回新连接 + * + * @param name + * 在属性文件中定义的连接池名字 + * @return Connection 可用连接或null + */ + public Connection getConnection(String name) throws DaoException{ + DBConnectionPool pool = (DBConnectionPool) pools.get(name); + if (pool != null) { + return pool.getConnection(); + } + return null; + } + + /** + * 获得一个可用连接.若没有可用连接,且已有连接数小于最大连接数限制, 则创建并返回新连接.否则,在指定的时间内等待其它线程释放连接. + * + * @param name + * 连接池名字 + * @param time + * 以毫秒计的等待时间 + * @return Connection 可用连接或null + */ + public Connection getConnection(String name, long time)throws DaoException { + DBConnectionPool pool = (DBConnectionPool) pools.get(name); + if (pool != null) { + return pool.getConnection(time); + } + return null; + } + + //返回多少个连接 + private int getClient() { + return clients; + } + + /** + * 关闭所有连接,撤销驱动程序的注册 + */ + public synchronized void release() { + // 等待直到最后一个客户程序调用 + if (--clients != 0) { + return; + } + + Enumeration allPools = pools.elements(); + while (allPools.hasMoreElements()) { + DBConnectionPool pool = (DBConnectionPool) allPools.nextElement(); + pool.release(); + } + + Enumeration allDrivers = drivers.elements(); + while (allDrivers.hasMoreElements()) { + Driver driver = (Driver) allDrivers.nextElement(); + try { + DriverManager.deregisterDriver(driver); + // System.out.println("撤销JDBC驱动程序 " + driver.getClass().getName() + // + "的注册"); + } catch (SQLException e) { + System.out.println("无法撤销下列JDBC驱动程序的注册: " + + driver.getClass().getName() + "。错误信息:" + + e.getMessage()); + } + } + } + + /** + * + * 清空PreparedStatement、Statement、ResultSet对象,未定义的置空。 + * @param pstmt PreparedStatement对象 + * @param stmt Statement对象 + * @param rs ResultSet对象 + */ + public void clear(PreparedStatement pstmt,Statement stmt,ResultSet rs) { + try { + if (stmt != null) { + stmt.close(); + } + if (rs != null) { + rs.close(); + } + if (pstmt != null) { + pstmt.close(); + } + } catch (SQLException e) { + e.printStackTrace(); + } + + } + + + /** + * 此内部类定义了一个连接池.它能够根据要求创建新连接,直到预定的最大连接数为止.在返回连接给客户程序之前,它能够验证连接的有效性. + */ + class DBConnectionPool { + private int checkedOut; + private Vector freeConnections = new Vector(); + private int maxConn; + private String name; + private String password; + private String URL; + private String user; + + /** + * 创建新的连接池 + * + * @param name + * 连接池名字 + * @param URL + * 数据库的JDBC URL + * @param user + * 数据库帐号,或 null + * @param password + * 密码,或 null + * @param maxConn + * 此连接池允许建立的最大连接数 + */ + public DBConnectionPool(String name, String URL, String user, + String password, int maxConn) { + this.name = name; + this.URL = URL; + this.user = user; + this.password = password; + this.maxConn = maxConn; + } + + /** + * 将不再使用的连接返回给连接池 + * + * @param con + * 客户程序释放的连接 + */ + public synchronized void freeConnection(Connection con) { + // 将指定连接加入到向量末尾 + freeConnections.addElement(con); + checkedOut--; + //release(); + notifyAll(); + } + + /** + * 从连接池获得一个可用连接.如没有空闲的连接且当前连接数小于最大连接 数限制,则创建新连接.如原来登记为可用的连接不再有效,则从向量删除之, + * 然后递归调用自己以尝试新的可用连接. + */ + public synchronized Connection getConnection() throws DaoException{ + Connection con = null; + if (freeConnections.size() > 0) { + // 获取向量中第一个可用连接 + con = (Connection) freeConnections.firstElement(); + freeConnections.removeElementAt(0); + try { + if (con.isClosed()) { + // 递归调用自己,尝试再次获取可用连接 + con = getConnection(); + } + } catch (SQLException e) { + // 递归调用自己,尝试再次获取可用连接 + con = getConnection(); + } + } else if (maxConn == 0 || checkedOut < maxConn) { + con = newConnection(); + } + if (con != null) { + checkedOut++; + } + return con; + } + + /** + * 从连接池获取可用连接.可以指定客户程序能够等待的最长时间 参见前一个getConnection()方法. + * + * @param timeout + * 以毫秒计的等待时间限制 + */ + public synchronized Connection getConnection(long timeout) throws DaoException { + long startTime = new Date().getTime(); + Connection con; + while ((con = getConnection()) == null) { + try { + wait(timeout); + } catch (InterruptedException e) { + } + if ((new Date().getTime() - startTime) >= timeout) { + // wait()返回的原因是超时 + return null; + } + } + return con; + } + + /** + * 关闭所有连接 + */ + public synchronized void release() { + Enumeration allConnections = freeConnections.elements(); + while (allConnections.hasMoreElements()) { + Connection con = (Connection) allConnections.nextElement(); + try { + con.close(); + } catch (SQLException e) { + System.out.println("无法关闭连接池" + name + "中的连接,错误信息:" + + e.getMessage()); + } + } + freeConnections.removeAllElements(); + } + + /** + * 创建新的连接 + */ + private Connection newConnection() throws DaoException { + Connection con = null; + try { + if (user == null) { + con = DriverManager.getConnection(URL); + } else { + con = DriverManager.getConnection(URL, user, password); + } + System.out.println("连接池" + name + "创建一个新的连接"); + } catch (SQLException e) { + throw new DaoException("无法创建下列URL的连接:" + URL + "\n错误信息:"+ e.getMessage()); + } + //freeConnections.addElement(con); + return con; + } + } + +} diff --git a/src/main/java/cn/ac/iie/dao/JdbcPzConnectionManager.java b/src/main/java/cn/ac/iie/dao/JdbcPzConnectionManager.java new file mode 100644 index 0000000..f35d5d3 --- /dev/null +++ b/src/main/java/cn/ac/iie/dao/JdbcPzConnectionManager.java @@ -0,0 +1,392 @@ +package cn.ac.iie.dao; + +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Date; +import java.util.Enumeration; +import java.util.Hashtable; +import java.util.Properties; +import java.util.StringTokenizer; +import java.util.Vector; + +import com.nis.exception.DaoException; + + + + + +/** + * + * <p>JDBC 连接管理类</p> + * <p>使用该类需要具备两个条件: + * <li>1.数据库驱动jar包</li> + * <li>2.在src目录下生成db.properties,具有如下内容 + * drivers=* 数据库连接驱动类 + * idb.url=* 数据库连接URL + * idb.maxconn=* 线程最大连接数 + * idb.user=* 数据库连接用户名 + * idb.password=* 数据库连接密码 + * </li> + * </p> + * @author 中科智源育成信息有限公司 E-mail: [email protected] + * @version 1.0 创建时间:2010-11-8 下午04:54:15 + * + */ +public final class JdbcPzConnectionManager { + private static final JdbcPzConnectionManager jdbcPzConnectionManager = new JdbcPzConnectionManager() ; // 唯一实例 + private static int clients; + private Vector drivers = new Vector(); + private Hashtable pools = new Hashtable(); + + //抑制默认的构造器,避免实例化对象 + private JdbcPzConnectionManager(){ + init(); + } + + /** + * 读取属性完成初始化 + */ + private void init() { + + //InputStream is = getClass().getResourceAsStream("/db.properties"); + + Properties dbProps = new Properties(); + try { + InputStream is = JdbcPzConnectionManager.class.getClassLoader().getResourceAsStream("db_pz.properties"); + //InputStream is = new FileInputStream(System.getProperty("user.dir")+File.separator+"config"+File.separator+"db.properties"); + dbProps.load(is); + System.out.println("db_pz.properties读取数据成功!"); + } catch (Exception e) { + System.err.println("不能读取属性文件. " + + "请确保db_pz.properties在CLASSPATH指定的路径中"); + throw new RuntimeException(new FileNotFoundException("unknow db_pz.properties")); + } + loadDrivers(dbProps); + createPools(dbProps); + } + + /** + * 装载和注册所有JDBC驱动程序 + * + * @param props + * 属性 + */ + private void loadDrivers(Properties props) { + String driverClasses = props.getProperty("drivers"); + StringTokenizer st = new StringTokenizer(driverClasses); + while (st.hasMoreElements()) { + String driverClassName = st.nextToken().trim(); + + try { + + Driver driver = (Driver) Class.forName(driverClassName) + .newInstance(); + DriverManager.registerDriver(driver); + drivers.addElement(driver); + } catch (Exception e) { + System.out.println("无法装载驱动,异常信息:" + e.getMessage()); + } + } + } + + /** + * 根据指定属性创建连接池实例. + * + * @param props + * 连接池属性 + */ + private void createPools(Properties props) { + Enumeration propNames = props.propertyNames(); + while (propNames.hasMoreElements()) { + String name = (String) propNames.nextElement(); + if (name.endsWith(".url")) { + String poolName = name.substring(0, name.lastIndexOf(".")); + String url = props.getProperty(poolName + ".url"); + if (url == null) { + System.out.println("没有为连接池" + poolName + "指定URL"); + continue; + } + String user = props.getProperty(poolName + ".user"); + String password = props.getProperty(poolName + ".password"); + String maxconn = props.getProperty(poolName + ".maxconn", "0"); + + int max; + try { + max = Integer.valueOf(maxconn).intValue(); + } catch (NumberFormatException e) { + System.out.println("错误的最大连接数限制: " + maxconn + " .连接池: " + + poolName); + max = 0; + } + DBConnectionPool pool = new DBConnectionPool(poolName, url, + user, password, max); + pools.put(poolName, pool); + + } + } + } + + + /** + * 返回singleton实例.如果是第一次调用此方法,则创建实例 + * + * @return JdbcConnectionManager 唯一实例 + */ + public static synchronized JdbcPzConnectionManager getInstance() { + clients++; + return jdbcPzConnectionManager; + } + + + + /** + * 将连接对象返回给由名字指定的连接池 + * + * @param name + * 在属性文件中定义的连接池名字 + * @param conn + * 连接对象 + */ + public void freeConnection(String name, Connection conn) { + DBConnectionPool pool = (DBConnectionPool) pools.get(name); + if (pool != null) { + pool.freeConnection(conn); + } + } + + /** + * 获得一个可用的(空闲的)连接.如果没有可用连接,且已有连接数小于最大连接数 限制,则创建并返回新连接 + * + * @param name + * 在属性文件中定义的连接池名字 + * @return Connection 可用连接或null + */ + public Connection getConnection(String name) throws DaoException{ + DBConnectionPool pool = (DBConnectionPool) pools.get(name); + if (pool != null) { + return pool.getConnection(); + } + return null; + } + + /** + * 获得一个可用连接.若没有可用连接,且已有连接数小于最大连接数限制, 则创建并返回新连接.否则,在指定的时间内等待其它线程释放连接. + * + * @param name + * 连接池名字 + * @param time + * 以毫秒计的等待时间 + * @return Connection 可用连接或null + */ + public Connection getConnection(String name, long time)throws DaoException { + DBConnectionPool pool = (DBConnectionPool) pools.get(name); + if (pool != null) { + return pool.getConnection(time); + } + return null; + } + + //返回多少个连接 + private int getClient() { + return clients; + } + + /** + * 关闭所有连接,撤销驱动程序的注册 + */ + public synchronized void release() { + // 等待直到最后一个客户程序调用 + if (--clients != 0) { + return; + } + + Enumeration allPools = pools.elements(); + while (allPools.hasMoreElements()) { + DBConnectionPool pool = (DBConnectionPool) allPools.nextElement(); + pool.release(); + } + + Enumeration allDrivers = drivers.elements(); + while (allDrivers.hasMoreElements()) { + Driver driver = (Driver) allDrivers.nextElement(); + try { + DriverManager.deregisterDriver(driver); + // System.out.println("撤销JDBC驱动程序 " + driver.getClass().getName() + // + "的注册"); + } catch (SQLException e) { + System.out.println("无法撤销下列JDBC驱动程序的注册: " + + driver.getClass().getName() + "。错误信息:" + + e.getMessage()); + } + } + } + + /** + * + * 清空PreparedStatement、Statement、ResultSet对象,未定义的置空。 + * @param pstmt PreparedStatement对象 + * @param stmt Statement对象 + * @param rs ResultSet对象 + */ + public void clear(PreparedStatement pstmt,Statement stmt,ResultSet rs) { + try { + if (stmt != null) { + stmt.close(); + } + if (rs != null) { + rs.close(); + } + if (pstmt != null) { + pstmt.close(); + } + } catch (SQLException e) { + e.printStackTrace(); + } + + } + + + /** + * 此内部类定义了一个连接池.它能够根据要求创建新连接,直到预定的最大连接数为止.在返回连接给客户程序之前,它能够验证连接的有效性. + */ + class DBConnectionPool { + private int checkedOut; + private Vector freeConnections = new Vector(); + private int maxConn; + private String name; + private String password; + private String URL; + private String user; + + /** + * 创建新的连接池 + * + * @param name + * 连接池名字 + * @param URL + * 数据库的JDBC URL + * @param user + * 数据库帐号,或 null + * @param password + * 密码,或 null + * @param maxConn + * 此连接池允许建立的最大连接数 + */ + public DBConnectionPool(String name, String URL, String user, + String password, int maxConn) { + this.name = name; + this.URL = URL; + this.user = user; + this.password = password; + this.maxConn = maxConn; + } + + /** + * 将不再使用的连接返回给连接池 + * + * @param con + * 客户程序释放的连接 + */ + public synchronized void freeConnection(Connection con) { + // 将指定连接加入到向量末尾 + freeConnections.addElement(con); + checkedOut--; + //release(); + notifyAll(); + } + + /** + * 从连接池获得一个可用连接.如没有空闲的连接且当前连接数小于最大连接 数限制,则创建新连接.如原来登记为可用的连接不再有效,则从向量删除之, + * 然后递归调用自己以尝试新的可用连接. + */ + public synchronized Connection getConnection() throws DaoException{ + Connection con = null; + if (freeConnections.size() > 0) { + // 获取向量中第一个可用连接 + con = (Connection) freeConnections.firstElement(); + freeConnections.removeElementAt(0); + try { + if (con.isClosed()) { + // 递归调用自己,尝试再次获取可用连接 + con = getConnection(); + } + } catch (SQLException e) { + // 递归调用自己,尝试再次获取可用连接 + con = getConnection(); + } + } else if (maxConn == 0 || checkedOut < maxConn) { + con = newConnection(); + } + if (con != null) { + checkedOut++; + } + return con; + } + + /** + * 从连接池获取可用连接.可以指定客户程序能够等待的最长时间 参见前一个getConnection()方法. + * + * @param timeout + * 以毫秒计的等待时间限制 + */ + public synchronized Connection getConnection(long timeout) throws DaoException { + long startTime = new Date().getTime(); + Connection con; + while ((con = getConnection()) == null) { + try { + wait(timeout); + } catch (InterruptedException e) { + } + if ((new Date().getTime() - startTime) >= timeout) { + // wait()返回的原因是超时 + return null; + } + } + return con; + } + + /** + * 关闭所有连接 + */ + public synchronized void release() { + Enumeration allConnections = freeConnections.elements(); + while (allConnections.hasMoreElements()) { + Connection con = (Connection) allConnections.nextElement(); + try { + con.close(); + } catch (SQLException e) { + System.out.println("无法关闭连接池" + name + "中的连接,错误信息:" + + e.getMessage()); + } + } + freeConnections.removeAllElements(); + } + + /** + * 创建新的连接 + */ + private Connection newConnection() throws DaoException { + Connection con = null; + try { + if (user == null) { + con = DriverManager.getConnection(URL); + } else { + con = DriverManager.getConnection(URL, user, password); + } + System.out.println("连接池" + name + "创建一个新的连接"); + } catch (SQLException e) { + throw new DaoException("无法创建下列URL的连接:" + URL + "\n错误信息:"+ e.getMessage()); + } + //freeConnections.addElement(con); + return con; + } + } + +} diff --git a/src/main/java/cn/ac/iie/dao/KafkaDB.java b/src/main/java/cn/ac/iie/dao/KafkaDB.java new file mode 100644 index 0000000..4d15449 --- /dev/null +++ b/src/main/java/cn/ac/iie/dao/KafkaDB.java @@ -0,0 +1,81 @@ +package cn.ac.iie.dao; + +import cn.ac.iie.bean.voipSipFromToLog.RouteRelationLog; +import cn.ac.iie.bean.voipSipOrigin.SipOriginALL; +import cn.ac.iie.common.RealtimeCountConfig; +import com.alibaba.fastjson.JSONObject; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.log4j.Logger; + +import java.text.SimpleDateFormat; +import java.util.LinkedList; +import java.util.Properties; + +/** + * 发送数据的kafka工具类 + */ +public class KafkaDB { + private static Logger logger = Logger.getLogger(KafkaDB.class); + + private static Producer<String, String> producer; + + private static KafkaDB kafkaDB; + + private KafkaDB() { + getProducer(); + } + + public static KafkaDB getInstance() { + if (kafkaDB == null) { + kafkaDB = new KafkaDB(); + } + return kafkaDB; + } + + public void siporiLog2KafkaFromSipInsertBoltDC(LinkedList<String> sipOriJsonS) { + long time = System.currentTimeMillis() / 1000L; + for (String sipOriJson : sipOriJsonS) { + try { + SipOriginALL sipOriginLog = JSONObject.parseObject(sipOriJson, SipOriginALL.class); + sipOriginLog.setStat_time(time); + producer.send(new ProducerRecord<>(RealtimeCountConfig.KAFKA_SIP_COMPLEMENT_TOPIC, JSONObject.toJSONString(sipOriginLog))); + } catch (Exception e) { + e.printStackTrace(); + } + } + producer.flush(); + } + + public void routeRelatLog2KafkaFromSipInsertBoltDC(LinkedList<String> routeJsonS) { + long time = System.currentTimeMillis() / 1000L; + for (String routeJson : routeJsonS) { + try { + RouteRelationLog routeRelationLog = JSONObject.parseObject(routeJson, RouteRelationLog.class); + routeRelationLog.setTimestamp(time); + producer.send(new ProducerRecord<>(RealtimeCountConfig.KAFKA_ROUTE_RELATION_TOPIC, JSONObject.toJSONString(routeRelationLog))); + } catch (Exception e) { + e.printStackTrace(); + } + } + producer.flush(); + } + + /** + * 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次 + */ + private void getProducer() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", RealtimeCountConfig.BOOTSTRAP_OUTPUT_SERVERS); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", "1"); + properties.put("linger.ms", "2"); + properties.put("request.timeout.ms", 20000); + properties.put("batch.size", 262144); + properties.put("buffer.memory", 33554432); + producer = new KafkaProducer<String, String>(properties); + } + +} |
