diff options
Diffstat (limited to 'src')
4 files changed, 3 insertions, 566 deletions
diff --git a/src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java b/src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java deleted file mode 100644 index d09243c..0000000 --- a/src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java +++ /dev/null @@ -1,136 +0,0 @@ -package cn.ac.iie.bolt; - -import cn.ac.iie.common.RealtimeCountConfig; -import cn.ac.iie.dao.KafkaDB; -import cn.ac.iie.utils.HiveDao.HdfsDataLoad_Avro; -import cn.ac.iie.utils.TupleUtils; -import com.zdjizhi.utils.StringUtil; -import org.apache.log4j.Logger; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; - -public class SipInsertBoltDC extends BaseBasicBolt { - - private static final long serialVersionUID = -6795251425357896415L; - private static Logger logger = Logger.getLogger(SipInsertBoltDC.class); - - private LinkedList<String> sipOriJsonList;//存放sip原始补全日志字符串 - private LinkedList<String> routeRelationJsonList;//存放voip路由关系日志字符串 - - private HdfsDataLoad_Avro hdfsDataLoadAvro; - - private Integer tickFreqSecs; - - public SipInsertBoltDC(Integer tickFreqSecs) { - this.tickFreqSecs = tickFreqSecs; - } - - @SuppressWarnings("rawtypes") - @Override - public void prepare(Map stormConf, - TopologyContext context) { - hdfsDataLoadAvro = HdfsDataLoad_Avro.getHdfsInstance(); - sipOriJsonList = new LinkedList<String>(); - routeRelationJsonList = new LinkedList<String>(); - } - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - try { - if (TupleUtils.isTick(tuple)) { - long time = System.currentTimeMillis() / 1000L; - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - String partition = sdf.format(new Date(time * 1000L)).replaceAll("-", "");//格式:20190722 - if (!sipOriJsonList.isEmpty()) { - LinkedList<String> tmpListFreq = new LinkedList<String>(); - tmpListFreq.addAll(sipOriJsonList); - sipOriJsonList.clear(); - hdfsDataLoadAvro.dataSipToHdfsAvro(partition, tmpListFreq, RealtimeCountConfig.KAFKA_SIP_COMPLEMENT_TOPIC, "origin", time); - } - - //定时写入voip路由关系日志---20190807废弃,路由关系转离线spark清洗获取 -// if (!routeRelationJsonList.isEmpty()) { -//// Map<String, Long> tmpMap = new HashMap<String, Long>(); -// LinkedList<String> tmpFragListFreq = new LinkedList<String>(); -// tmpFragListFreq.addAll(routeRelationJsonList); -// routeRelationJsonList.clear(); -// kafkaDB.routeRelatLog2KafkaFromSipInsertBoltDC(tmpFragListFreq); -//// dcl.dfPzFlowBatchStorage(tmpMap);//正式用,直接入中心http,已验证可用 -//// dbl.dfPzFlowBatchStorage2CH(tmpMap);//测试入clickhouse20190220 -//// dbl.dfPzFlowBatchStorage2CH(tmpFragListFreq);//测试入clickhouse20190220 -// } - } else { - String jsonLog = tuple.getString(0); - String logType = tuple.getString(1); - switch (logType) { - case "origin": - if (StringUtil.isNotBlank(jsonLog)) { - sipOriJsonList.add(jsonLog); - collector.emit(new Values(jsonLog)); - } - break; -// case "route"://存放路由关系数据---20190807废弃,路由关系转离线spark清洗获取 -// if (StringUtil.isNotBlank(jsonLog)) { -// routeRelationJsonList.add(jsonLog); -// } -// break; - default: - logger.error("SipInsertBoltDC logType is error !!!This logType is--->{" + logType + "}<---"); - break; - } - long time = System.currentTimeMillis() / 1000L; - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - String partition = sdf.format(new Date(time * 1000L)).replaceAll("-", ""); - if (sipOriJsonList.size() >= RealtimeCountConfig.BATCH_KAFKA_INSERT_NUM) { - LinkedList<String> tmpList = new LinkedList<String>(); - tmpList.addAll(sipOriJsonList); - sipOriJsonList.clear(); - hdfsDataLoadAvro.dataSipToHdfsAvro(partition, tmpList, RealtimeCountConfig.KAFKA_SIP_COMPLEMENT_TOPIC, "origin", time); - } - //写入voip路由关系日志---20190807废弃,路由关系转离线spark清洗获取 -// if (routeRelationJsonList.size() >= RealtimeCountConfig.BATCH_KAFKA_INSERT_NUM) { -// LinkedList<String> tmpRouteList = new LinkedList<String>(); -// tmpRouteList.addAll(routeRelationJsonList); -// routeRelationJsonList.clear(); -//// dbl.dfPzFlowBatchStorage2CH(tmpRouteList);//测试入clickhouse20190220 -// kafkaDB.routeRelatLog2KafkaFromSipInsertBoltDC(tmpRouteList); -// } - - } - } catch (Exception e) { - logger.error("SipInsertBoltDC to insert is error !!!--->{" + e + "}<---"); - e.printStackTrace(); - } - } - - private void logCount(String key, Map<String, Long> hm) { - if (hm.containsKey(key)) { - hm.put(key, hm.get(key) + 1); - } else { - hm.put(key, 1l); - } - } - - @Override - public Map<String, Object> getComponentConfiguration() { - Map<String, Object> conf = new HashMap<String, Object>(); - conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFreqSecs); - return conf; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("countJsonLog")); - } -}
\ No newline at end of file diff --git a/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java b/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java index 373a4a3..59f6c57 100644 --- a/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java +++ b/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java @@ -57,10 +57,11 @@ public class RealtimeCountConfig implements Serializable{ public static final String HDFS_PATH = RealtimeCountConfigurations.getStringProperty(0,"hdfs.path"); public static final String HDFS_USER = RealtimeCountConfigurations.getStringProperty(0,"hdfs.user"); // public static final String HIVE_URL = RealtimeCountConfigurations.getStringProperty(0,"hive.url"); -// public static final String HIVE_USERNAME = RealtimeCountConfigurations.getStringProperty(0,"hive.username"); -// public static final String HIVE_PASSWORD = RealtimeCountConfigurations.getStringProperty(0,"hive.password"); + // public static final String HIVE_USERNAME = RealtimeCountConfigurations.getStringProperty(0,"hive.username"); + // public static final String HIVE_PASSWORD = RealtimeCountConfigurations.getStringProperty(0,"hive.password"); public static final String HIVE_SIP_CLEAN_TABLE = RealtimeCountConfigurations.getStringProperty(0,"hive.sip.clean.table"); // public static final String HIVE_SIP_ROUTE_TABLE = RealtimeCountConfigurations.getStringProperty(0,"hive.sip.route.table"); + public static final Integer PEAK_LOAD_SIP_PERCENT = RealtimeCountConfigurations.getIntProperty(0, "peak.load.sip.percent"); //---------------storm_config.properties--------------- public static final Integer SPOUT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "spout.parallelism"); @@ -77,7 +78,6 @@ public class RealtimeCountConfig implements Serializable{ public static final Integer TOPOLOGY_TICK_TUPLE_MERGE_FREQ_SECS = RealtimeCountConfigurations.getIntProperty(1, "topology.tick.tuple.merge.freq.secs"); public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = RealtimeCountConfigurations.getIntProperty(1, "topology.config.max.spout.pending"); public static final Integer TOPOLOGY_NUM_ACKS = RealtimeCountConfigurations.getIntProperty(1, "topology.num.acks"); - public static final Integer PEAK_LOAD_SIP_PERCENT = RealtimeCountConfigurations.getIntProperty(10, "peak.load.sip.percent"); //参数展示 public static void configShow(){ diff --git a/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java b/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java deleted file mode 100644 index 152bc4d..0000000 --- a/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java +++ /dev/null @@ -1,240 +0,0 @@ -package cn.ac.iie.utils.HiveDao; - -import cn.ac.iie.bean.voipSipOrigin.SipOriginALL; -import cn.ac.iie.common.RealtimeCountConfig; -import com.alibaba.druid.pool.DruidDataSource; -import com.alibaba.fastjson.JSONObject; -import com.zdjizhi.utils.StringUtil; -import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; - -import java.net.URI; -import java.sql.Connection; -import java.sql.Statement; -import java.util.LinkedList; -import java.util.UUID; - -/** - * HDFS-Hive-Avro格式载入类 - * - * @author Colbert - */ -public class HdfsDataLoad_Avro { - - private static Logger logger = Logger.getLogger(HdfsDataLoad_Avro.class); - private static DruidDataSource ds = HiveDataSourceUtil.getHiveDataSource(); - private Connection con = null; - private Statement stmt = null; - private FSDataOutputStream outputStream_avro = null; - - private DataFileWriter<GenericRecord> fileWriter = null; - - private static HdfsDataLoad_Avro hdfsDataLoad_avro = null; - private static FileSystem fileSystem = null; - - private HdfsDataLoad_Avro() { - getHdfsConnection(); - } - - public static HdfsDataLoad_Avro getHdfsInstance() { - if (hdfsDataLoad_avro == null || fileSystem == null) { - hdfsDataLoad_avro = new HdfsDataLoad_Avro(); - } - return hdfsDataLoad_avro; - } - - /** - * 创建HDFS连接 - */ - private void getHdfsConnection() { - try { - Configuration conf = new Configuration(); - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.setBoolean("dfs.support.append", true); - conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); - conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true); - fileSystem = FileSystem.get(new URI(RealtimeCountConfig.HDFS_URL), conf, RealtimeCountConfig.HDFS_USER); - } catch (Exception e) { - logger.error("HdfsDataLoad_Avro getHdfsConnection method is error !!!--->{" + e + "}<---"); - e.printStackTrace(); - } - } - - - public void dataSipToHdfsAvro(String partition, LinkedList<String> data, String topicName, String logType, Long timeSend) { - long time = timeSend; - String uuid = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase(); - String hdfs_path = RealtimeCountConfig.HDFS_URL + RealtimeCountConfig.HDFS_PATH + topicName.toLowerCase() + "/" + partition + "/" + topicName.toLowerCase() + "-" + uuid + ".avro";//格式 hdfs://ns1/input/frag-media-expire-log/20190730/frag-media-expire-log-d84772e8257048f3be1ca82f8e35f215.avro - try { - Path path = new Path(hdfs_path); - fileSystem.createNewFile(path); - outputStream_avro = fileSystem.append(path); - - switch (logType) { - case "origin": - String schemaJsonSip = "{\"type\": \"record\",\"name\": \"siplog\",\"fields\": [{\"name\": \"call_id\", \"type\": [\"string\", \"null\"]},{\"name\": \"clj_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"found_time\", \"type\": \"int\"},{\"name\": \"src_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_nation\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_nation_code\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_region\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_port\", \"type\": \"int\"},{\"name\": \"dst_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"ip_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_nation\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_nation_code\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_region\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_port\", \"type\": \"int\"},{\"name\": \"method\", \"type\": [\"string\", \"null\"]},{\"name\": \"request_uri\", \"type\": [\"string\", \"null\"]},{\"name\": \"user_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"service_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"service_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_stat\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_stat_format\", \"type\": [\"string\", \"null\"]},{\"name\": \"from\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_tag\", \"type\": [\"string\", \"null\"]},{\"name\": \"to\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_tag\", \"type\": [\"string\", \"null\"]},{\"name\": \"cseq\", \"type\": [\"string\", \"null\"]},{\"name\": \"cseq_method\", \"type\": [\"string\", \"null\"]},{\"name\": \"user_agent\", \"type\": [\"string\", \"null\"]},{\"name\": \"device_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"max_forwards\", \"type\": [\"string\", \"null\"]},{\"name\": \"server\", \"type\": [\"string\", \"null\"]},{\"name\": \"server_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_via_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_record_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_expires\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_others\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_content_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_content\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_via_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_record_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_expires\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_others\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_content_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_content\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_coding\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_coding\", \"type\": [\"string\", \"null\"]},{\"name\": \"stat_time\", \"type\": \"int\"}]}"; - Schema schemaSip = new Schema.Parser().parse(schemaJsonSip); - fileWriter = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schemaSip)).setSyncInterval(100); - fileWriter.setCodec(CodecFactory.snappyCodec()); - fileWriter.create(schemaSip, outputStream_avro); - for (String dataJson : data) { - SipOriginALL sipOriginAllLog = JSONObject.parseObject(dataJson, SipOriginALL.class); - sipOriginAllLog.setStat_time(time); - GenericRecord recordSip = new GenericData.Record(schemaSip); - - recordSip.put("call_id", sipOriginAllLog.getCall_ID()); - recordSip.put("clj_ip", sipOriginAllLog.getCLJ_IP()); - recordSip.put("found_time", sipOriginAllLog.getFound_Time()); - recordSip.put("src_ip", sipOriginAllLog.getSRC_IP()); - recordSip.put("src_location_nation", sipOriginAllLog.getSRC_LOCATION_NATION()); - recordSip.put("src_location_nation_code", sipOriginAllLog.getSRC_LOCATION_NATION_CODE()); - recordSip.put("src_location_region", sipOriginAllLog.getSRC_LOCATION_REGION()); - recordSip.put("src_port", sipOriginAllLog.getSRC_PORT()); - recordSip.put("dst_ip", sipOriginAllLog.getDST_IP()); - recordSip.put("ip_type", sipOriginAllLog.getIP_TYPE()); - recordSip.put("dst_location_nation", sipOriginAllLog.getDST_LOCATION_NATION()); - recordSip.put("dst_location_nation_code", sipOriginAllLog.getDST_LOCATION_NATION_CODE()); - recordSip.put("dst_location_region", sipOriginAllLog.getDST_LOCATION_REGION()); - recordSip.put("dst_port", sipOriginAllLog.getDST_PORT()); - recordSip.put("method", sipOriginAllLog.getMethod()); - recordSip.put("request_uri", sipOriginAllLog.getRequest_URI()); - recordSip.put("user_name", sipOriginAllLog.getUser_name()); - recordSip.put("service_domain", sipOriginAllLog.getService_domain()); - recordSip.put("service_domain_valid", sipOriginAllLog.getService_domain_valid()); - recordSip.put("res_stat", sipOriginAllLog.getRes_stat()); - recordSip.put("res_stat_format", sipOriginAllLog.getRes_stat_format()); - recordSip.put("from", sipOriginAllLog.getFrom()); - recordSip.put("from_nickname", sipOriginAllLog.getFrom_Nickname()); - recordSip.put("from_usr_name", sipOriginAllLog.getFrom_usr_name()); - recordSip.put("from_ser_domain", sipOriginAllLog.getFrom_ser_domain()); - recordSip.put("from_ser_domain_valid", sipOriginAllLog.getFrom_ser_domain_valid()); - recordSip.put("from_tag", sipOriginAllLog.getFrom_tag()); - recordSip.put("to", sipOriginAllLog.getTo()); - recordSip.put("to_nickname", sipOriginAllLog.getTo_Nickname()); - recordSip.put("to_usr_name", sipOriginAllLog.getTo_usr_name()); - recordSip.put("to_ser_domain", sipOriginAllLog.getTo_ser_domain()); - recordSip.put("to_ser_domain_valid", sipOriginAllLog.getTo_ser_domain_valid()); - recordSip.put("to_tag", sipOriginAllLog.getTo_tag()); - recordSip.put("cseq", sipOriginAllLog.getCseq()); - recordSip.put("cseq_method", sipOriginAllLog.getCseq_method()); - recordSip.put("user_agent", sipOriginAllLog.getUser_Agent()); - recordSip.put("device_type", sipOriginAllLog.getDevice_type()); - recordSip.put("max_forwards", sipOriginAllLog.getMax_Forwards()); - recordSip.put("server", sipOriginAllLog.getServer()); - recordSip.put("server_type", sipOriginAllLog.getServer_type()); - recordSip.put("req_via_json", sipOriginAllLog.getReq_Via_Json()); - recordSip.put("req_contact", sipOriginAllLog.getReq_Contact()); - recordSip.put("req_contact_nickname", sipOriginAllLog.getReq_Contact_Nickname()); - recordSip.put("req_contact_usr_name", sipOriginAllLog.getReq_Contact_usr_name()); - recordSip.put("req_contact_ser_domain", sipOriginAllLog.getReq_Contact_ser_domain()); - recordSip.put("req_ser_domain_valid", sipOriginAllLog.getReq_ser_domain_valid()); - recordSip.put("req_record_route_json", sipOriginAllLog.getReq_Record_Route_Json()); - recordSip.put("req_route_json", sipOriginAllLog.getReq_Route_Json()); - recordSip.put("req_expires", sipOriginAllLog.getReq_Expires()); - recordSip.put("req_others", sipOriginAllLog.getReq_Others()); - recordSip.put("req_content_type", sipOriginAllLog.getReq_Content_Type()); - recordSip.put("req_content", sipOriginAllLog.getReq_Content()); - recordSip.put("res_via_json", sipOriginAllLog.getRes_Via_Json()); - recordSip.put("res_contact", sipOriginAllLog.getRes_Contact()); - recordSip.put("res_contact_nickname", sipOriginAllLog.getRes_Contact_Nickname()); - recordSip.put("res_contact_usr_name", sipOriginAllLog.getRes_Contact_usr_name()); - recordSip.put("res_contact_ser_domain", sipOriginAllLog.getRes_Contact_ser_domain()); - recordSip.put("res_ser_domain_valid", sipOriginAllLog.getRes_ser_domain_valid()); - recordSip.put("res_record_route_json", sipOriginAllLog.getRes_Record_Route_Json()); - recordSip.put("res_route_json", sipOriginAllLog.getRes_Route_Json()); - recordSip.put("res_expires", sipOriginAllLog.getRes_Expires()); - recordSip.put("res_others", sipOriginAllLog.getRes_Others()); - recordSip.put("res_content_type", sipOriginAllLog.getRes_Content_Type()); - recordSip.put("res_content", sipOriginAllLog.getRes_Content()); - recordSip.put("req_coding", sipOriginAllLog.getReq_coding()); - recordSip.put("res_coding", sipOriginAllLog.getRes_coding()); - recordSip.put("stat_time", sipOriginAllLog.getStat_time()); - - fileWriter.append(recordSip); - fileWriter.flush(); - } - break; - default: - logger.error("HdfsDataLoad_Avro toHdfs logType is error !!!This logType is--->{" + logType + "}<---"); - break; - } - - logger.warn("HdfsDataLoad_Avro data to HDFS Successful,hdfs_path is -->{" + hdfs_path + "}<---"); - - if (fileWriter != null) { - fileWriter.close(); - fileWriter = null; - } - if (outputStream_avro != null) { - outputStream_avro.close(); - outputStream_avro = null; - } - - switch (logType) { - case "origin": - String tablenameSip = RealtimeCountConfig.HIVE_SIP_CLEAN_TABLE; - loadDataToHiveAvro(tablenameSip, partition, hdfs_path); - break; -// case "route": -// String tablenameFrag = RealtimeCountConfig.HIVE_SIP_ROUTE_TABLE; -// loadDataToHiveAvro(tablenameFrag, partition, hdfs_path); -// break; - default: - logger.error("HdfsDataLoad_Avro toHive logType is error !!!This logType is--->{" + logType + "}<---"); - break; - } - - } catch (Exception e) { - logger.error("HdfsDataLoad_Avro dataToHdfs method is error !!!--->{" + e + "}<---"); - e.printStackTrace(); - } - } - - public void loadDataToHiveAvro(String tablename, String partition, String hdfs_path) { - String dataUrl = hdfs_path; - StringBuffer sb = new StringBuffer(); - try { - con = ds.getConnection(); - stmt = con.createStatement(); - sb.append("load data inpath ").append("'").append(dataUrl).append("'") - .append(" into table ").append(tablename).append(" partition( time_partition=").append(partition).append(")"); - stmt.execute(sb.toString()); - - logger.warn("HdfsDataLoad_Avro data to Hive Successful,dataUrl is -->{" + dataUrl + "}<---"); - - } catch (Exception e) { - logger.error("HdfsDataLoad_Avro loadDataToHive method is error !!!--->{" + e + "}<---"); - e.printStackTrace(); - } finally { - try { - if (stmt != null) { - stmt.close(); - stmt = null; - } - if (con != null) { - con.close(); - con = null; - } - } catch (Exception e) { - logger.error("HdfsDataLoad_Avro loadDataToHive when close is error !!!--->{" + e + "}<---"); - e.printStackTrace(); - } - } - } - - private String strDefaultValue(String str) { - return (StringUtil.isBlank(str) - || str == null - || str.equals("") - || str.length() == 0) ? "$#$" : str; - } -} diff --git a/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java b/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java deleted file mode 100644 index 9223f92..0000000 --- a/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java +++ /dev/null @@ -1,187 +0,0 @@ -package cn.ac.iie.utils.HiveDao; - -import com.alibaba.druid.pool.DruidDataSource; -import com.alibaba.fastjson.JSONException; -import com.alibaba.fastjson.JSONObject; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import net.sf.json.JSONArray; -import org.apache.log4j.Logger; - -import java.sql.*; -import java.util.Properties; - -/** - * Hive-JDBC连接池 - * - * @author Colbert - */ -public class HiveDataSourceUtil { - private static DruidDataSource hiveDataSource = new DruidDataSource(); - public static Connection conn = null; - private static final Logger logger = Logger.getLogger(HiveDataSourceUtil.class); - - public static DruidDataSource getHiveDataSource() { - if (hiveDataSource.isInited()) { - return hiveDataSource; - } - - try { - Properties dsProp = new Properties(); - dsProp.load(HiveDataSourceUtil.class.getClassLoader().getResourceAsStream("hive.properties")); - hiveDataSource.setDriverClassName(dsProp.getProperty("hive_jdbc_drivers")); - //基本属性 url、user、password - hiveDataSource.setUrl(dsProp.getProperty("hive_jdbc_url")); - hiveDataSource.setUsername(dsProp.getProperty("hive_jdbc_username")); - hiveDataSource.setPassword(dsProp.getProperty("hive_jdbc_password")); - - //配置初始化大小、最小、最大 - hiveDataSource.setInitialSize(Integer.parseInt(dsProp.getProperty("hive_initialSize"))); - hiveDataSource.setMinIdle(Integer.parseInt(dsProp.getProperty("hive_minIdle"))); - hiveDataSource.setMaxActive(Integer.parseInt(dsProp.getProperty("hive_maxActive"))); - - //配置获取连接等待超时的时间 - hiveDataSource.setMaxWait(Integer.parseInt(dsProp.getProperty("hive_maxWait"))); - - //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 - hiveDataSource.setTimeBetweenEvictionRunsMillis(60000); - - //配置一个连接在池中最小生存的时间,单位是毫秒 - hiveDataSource.setMinEvictableIdleTimeMillis(300000); - - hiveDataSource.setValidationQuery("SELECT 1"); - hiveDataSource.setTestWhileIdle(true); - hiveDataSource.setTestOnBorrow(true); -// hiveDataSource.setKeepAlive(true); - - //打开PSCache,并且指定每个连接上PSCache的大小 - hiveDataSource.setPoolPreparedStatements(true); - hiveDataSource.setMaxPoolPreparedStatementPerConnectionSize(20); - - //配置监控统计拦截的filters -// hiveDataSource.setFilters("stat"); - - hiveDataSource.init(); - } catch (Exception e) { - e.printStackTrace(); - closeHiveDataSource(); - } - return hiveDataSource; - } - - /** - * @Description:关闭Hive连接池 - */ - public static void closeHiveDataSource() { - if (hiveDataSource != null) { - hiveDataSource.close(); - } - } - - /** - * @return - * @Description:获取Hive连接 - */ - public static Connection getHiveConn() { - try { - hiveDataSource = getHiveDataSource(); - conn = hiveDataSource.getConnection(); - } catch (Exception e) { - logger.error("HiveDataSourceUtil--" + e + ":获取Hive连接失败!"); - } - return conn; - } - - /** - * @Description:关闭Hive数据连接 - */ - public static void closeConn() { - try { - if (conn != null) { - conn.close(); - } - } catch (Exception e) { - logger.error("HiveDataSourceUtil--" + e + ":关闭Hive-conn连接失败!"); - } - } - - - public static void main(String[] args) throws Exception { - DruidDataSource ds = HiveDataSourceUtil.getHiveDataSource(); - Connection conn = ds.getConnection(); - Statement stmt = null; - if (conn == null) { - System.out.println("null"); - } else { - System.out.println("conn"); - stmt = conn.createStatement(); - ResultSet res = stmt.executeQuery("select * from test.frag_media_expire_log limit 10"); - int i = 0; - while (res.next()) { - if (i < 10) { - System.out.println(res.getString(2)); - i++; - } - } -// String s = resultSetToJson(res); -// String s = ResultSetToJsonString(res); -// System.out.println(s); - } - - stmt.close(); - conn.close(); - } - - public static String resultSetToJson(ResultSet rs) throws SQLException, JSONException { - // json数组 - JSONArray array = new JSONArray(); - - // 获取列数 - ResultSetMetaData metaData = rs.getMetaData(); - int columnCount = metaData.getColumnCount(); - - // 遍历ResultSet中的每条数据 - while (rs.next()) { - JSONObject jsonObj = new JSONObject(); - - // 遍历每一列 - for (int i = 1; i <= columnCount; i++) { - String columnName = metaData.getColumnLabel(i); - String value = rs.getString(columnName); - jsonObj.put(columnName, value); - } -// array.put(jsonObj); - array.add(jsonObj); - } - - return array.toString(); - } - - public static final JsonObject ResultSetToJsonObject(ResultSet rs) { - JsonObject element = null; - JsonArray ja = new JsonArray(); - JsonObject jo = new JsonObject(); - ResultSetMetaData rsmd = null; - String columnName, columnValue = null; - try { - rsmd = rs.getMetaData(); - while (rs.next()) { - element = new JsonObject(); - for (int i = 0; i < rsmd.getColumnCount(); i++) { - columnName = rsmd.getColumnName(i + 1); - columnValue = rs.getString(columnName); - element.addProperty(columnName, columnValue); - } - ja.add(element); - } - jo.add("result", ja); - } catch (SQLException e) { - e.printStackTrace(); - } - return jo; - } - - public static final String ResultSetToJsonString(ResultSet rs) { - return ResultSetToJsonObject(rs).toString(); - } -} |
