summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java136
-rw-r--r--src/main/java/cn/ac/iie/common/RealtimeCountConfig.java6
-rw-r--r--src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java240
-rw-r--r--src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java187
4 files changed, 3 insertions, 566 deletions
diff --git a/src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java b/src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java
deleted file mode 100644
index d09243c..0000000
--- a/src/main/java/cn/ac/iie/bolt/SipInsertBoltDC.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package cn.ac.iie.bolt;
-
-import cn.ac.iie.common.RealtimeCountConfig;
-import cn.ac.iie.dao.KafkaDB;
-import cn.ac.iie.utils.HiveDao.HdfsDataLoad_Avro;
-import cn.ac.iie.utils.TupleUtils;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.log4j.Logger;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-
-public class SipInsertBoltDC extends BaseBasicBolt {
-
- private static final long serialVersionUID = -6795251425357896415L;
- private static Logger logger = Logger.getLogger(SipInsertBoltDC.class);
-
- private LinkedList<String> sipOriJsonList;//存放sip原始补全日志字符串
- private LinkedList<String> routeRelationJsonList;//存放voip路由关系日志字符串
-
- private HdfsDataLoad_Avro hdfsDataLoadAvro;
-
- private Integer tickFreqSecs;
-
- public SipInsertBoltDC(Integer tickFreqSecs) {
- this.tickFreqSecs = tickFreqSecs;
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map stormConf,
- TopologyContext context) {
- hdfsDataLoadAvro = HdfsDataLoad_Avro.getHdfsInstance();
- sipOriJsonList = new LinkedList<String>();
- routeRelationJsonList = new LinkedList<String>();
- }
-
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- try {
- if (TupleUtils.isTick(tuple)) {
- long time = System.currentTimeMillis() / 1000L;
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
- String partition = sdf.format(new Date(time * 1000L)).replaceAll("-", "");//格式:20190722
- if (!sipOriJsonList.isEmpty()) {
- LinkedList<String> tmpListFreq = new LinkedList<String>();
- tmpListFreq.addAll(sipOriJsonList);
- sipOriJsonList.clear();
- hdfsDataLoadAvro.dataSipToHdfsAvro(partition, tmpListFreq, RealtimeCountConfig.KAFKA_SIP_COMPLEMENT_TOPIC, "origin", time);
- }
-
- //定时写入voip路由关系日志---20190807废弃,路由关系转离线spark清洗获取
-// if (!routeRelationJsonList.isEmpty()) {
-//// Map<String, Long> tmpMap = new HashMap<String, Long>();
-// LinkedList<String> tmpFragListFreq = new LinkedList<String>();
-// tmpFragListFreq.addAll(routeRelationJsonList);
-// routeRelationJsonList.clear();
-// kafkaDB.routeRelatLog2KafkaFromSipInsertBoltDC(tmpFragListFreq);
-//// dcl.dfPzFlowBatchStorage(tmpMap);//正式用,直接入中心http,已验证可用
-//// dbl.dfPzFlowBatchStorage2CH(tmpMap);//测试入clickhouse20190220
-//// dbl.dfPzFlowBatchStorage2CH(tmpFragListFreq);//测试入clickhouse20190220
-// }
- } else {
- String jsonLog = tuple.getString(0);
- String logType = tuple.getString(1);
- switch (logType) {
- case "origin":
- if (StringUtil.isNotBlank(jsonLog)) {
- sipOriJsonList.add(jsonLog);
- collector.emit(new Values(jsonLog));
- }
- break;
-// case "route"://存放路由关系数据---20190807废弃,路由关系转离线spark清洗获取
-// if (StringUtil.isNotBlank(jsonLog)) {
-// routeRelationJsonList.add(jsonLog);
-// }
-// break;
- default:
- logger.error("SipInsertBoltDC logType is error !!!This logType is--->{" + logType + "}<---");
- break;
- }
- long time = System.currentTimeMillis() / 1000L;
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
- String partition = sdf.format(new Date(time * 1000L)).replaceAll("-", "");
- if (sipOriJsonList.size() >= RealtimeCountConfig.BATCH_KAFKA_INSERT_NUM) {
- LinkedList<String> tmpList = new LinkedList<String>();
- tmpList.addAll(sipOriJsonList);
- sipOriJsonList.clear();
- hdfsDataLoadAvro.dataSipToHdfsAvro(partition, tmpList, RealtimeCountConfig.KAFKA_SIP_COMPLEMENT_TOPIC, "origin", time);
- }
- //写入voip路由关系日志---20190807废弃,路由关系转离线spark清洗获取
-// if (routeRelationJsonList.size() >= RealtimeCountConfig.BATCH_KAFKA_INSERT_NUM) {
-// LinkedList<String> tmpRouteList = new LinkedList<String>();
-// tmpRouteList.addAll(routeRelationJsonList);
-// routeRelationJsonList.clear();
-//// dbl.dfPzFlowBatchStorage2CH(tmpRouteList);//测试入clickhouse20190220
-// kafkaDB.routeRelatLog2KafkaFromSipInsertBoltDC(tmpRouteList);
-// }
-
- }
- } catch (Exception e) {
- logger.error("SipInsertBoltDC to insert is error !!!--->{" + e + "}<---");
- e.printStackTrace();
- }
- }
-
- private void logCount(String key, Map<String, Long> hm) {
- if (hm.containsKey(key)) {
- hm.put(key, hm.get(key) + 1);
- } else {
- hm.put(key, 1l);
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> conf = new HashMap<String, Object>();
- conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFreqSecs);
- return conf;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("countJsonLog"));
- }
-} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java b/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java
index 373a4a3..59f6c57 100644
--- a/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java
+++ b/src/main/java/cn/ac/iie/common/RealtimeCountConfig.java
@@ -57,10 +57,11 @@ public class RealtimeCountConfig implements Serializable{
public static final String HDFS_PATH = RealtimeCountConfigurations.getStringProperty(0,"hdfs.path");
public static final String HDFS_USER = RealtimeCountConfigurations.getStringProperty(0,"hdfs.user");
// public static final String HIVE_URL = RealtimeCountConfigurations.getStringProperty(0,"hive.url");
-// public static final String HIVE_USERNAME = RealtimeCountConfigurations.getStringProperty(0,"hive.username");
-// public static final String HIVE_PASSWORD = RealtimeCountConfigurations.getStringProperty(0,"hive.password");
+ // public static final String HIVE_USERNAME = RealtimeCountConfigurations.getStringProperty(0,"hive.username");
+ // public static final String HIVE_PASSWORD = RealtimeCountConfigurations.getStringProperty(0,"hive.password");
public static final String HIVE_SIP_CLEAN_TABLE = RealtimeCountConfigurations.getStringProperty(0,"hive.sip.clean.table");
// public static final String HIVE_SIP_ROUTE_TABLE = RealtimeCountConfigurations.getStringProperty(0,"hive.sip.route.table");
+ public static final Integer PEAK_LOAD_SIP_PERCENT = RealtimeCountConfigurations.getIntProperty(0, "peak.load.sip.percent");
//---------------storm_config.properties---------------
public static final Integer SPOUT_PARALLELISM = RealtimeCountConfigurations.getIntProperty(1, "spout.parallelism");
@@ -77,7 +78,6 @@ public class RealtimeCountConfig implements Serializable{
public static final Integer TOPOLOGY_TICK_TUPLE_MERGE_FREQ_SECS = RealtimeCountConfigurations.getIntProperty(1, "topology.tick.tuple.merge.freq.secs");
public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = RealtimeCountConfigurations.getIntProperty(1, "topology.config.max.spout.pending");
public static final Integer TOPOLOGY_NUM_ACKS = RealtimeCountConfigurations.getIntProperty(1, "topology.num.acks");
- public static final Integer PEAK_LOAD_SIP_PERCENT = RealtimeCountConfigurations.getIntProperty(10, "peak.load.sip.percent");
//参数展示
public static void configShow(){
diff --git a/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java b/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java
deleted file mode 100644
index 152bc4d..0000000
--- a/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package cn.ac.iie.utils.HiveDao;
-
-import cn.ac.iie.bean.voipSipOrigin.SipOriginALL;
-import cn.ac.iie.common.RealtimeCountConfig;
-import com.alibaba.druid.pool.DruidDataSource;
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import java.net.URI;
-import java.sql.Connection;
-import java.sql.Statement;
-import java.util.LinkedList;
-import java.util.UUID;
-
-/**
- * HDFS-Hive-Avro格式载入类
- *
- * @author Colbert
- */
-public class HdfsDataLoad_Avro {
-
- private static Logger logger = Logger.getLogger(HdfsDataLoad_Avro.class);
- private static DruidDataSource ds = HiveDataSourceUtil.getHiveDataSource();
- private Connection con = null;
- private Statement stmt = null;
- private FSDataOutputStream outputStream_avro = null;
-
- private DataFileWriter<GenericRecord> fileWriter = null;
-
- private static HdfsDataLoad_Avro hdfsDataLoad_avro = null;
- private static FileSystem fileSystem = null;
-
- private HdfsDataLoad_Avro() {
- getHdfsConnection();
- }
-
- public static HdfsDataLoad_Avro getHdfsInstance() {
- if (hdfsDataLoad_avro == null || fileSystem == null) {
- hdfsDataLoad_avro = new HdfsDataLoad_Avro();
- }
- return hdfsDataLoad_avro;
- }
-
- /**
- * 创建HDFS连接
- */
- private void getHdfsConnection() {
- try {
- Configuration conf = new Configuration();
- conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
- conf.setBoolean("dfs.support.append", true);
- conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
- conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
- fileSystem = FileSystem.get(new URI(RealtimeCountConfig.HDFS_URL), conf, RealtimeCountConfig.HDFS_USER);
- } catch (Exception e) {
- logger.error("HdfsDataLoad_Avro getHdfsConnection method is error !!!--->{" + e + "}<---");
- e.printStackTrace();
- }
- }
-
-
- public void dataSipToHdfsAvro(String partition, LinkedList<String> data, String topicName, String logType, Long timeSend) {
- long time = timeSend;
- String uuid = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
- String hdfs_path = RealtimeCountConfig.HDFS_URL + RealtimeCountConfig.HDFS_PATH + topicName.toLowerCase() + "/" + partition + "/" + topicName.toLowerCase() + "-" + uuid + ".avro";//格式 hdfs://ns1/input/frag-media-expire-log/20190730/frag-media-expire-log-d84772e8257048f3be1ca82f8e35f215.avro
- try {
- Path path = new Path(hdfs_path);
- fileSystem.createNewFile(path);
- outputStream_avro = fileSystem.append(path);
-
- switch (logType) {
- case "origin":
- String schemaJsonSip = "{\"type\": \"record\",\"name\": \"siplog\",\"fields\": [{\"name\": \"call_id\", \"type\": [\"string\", \"null\"]},{\"name\": \"clj_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"found_time\", \"type\": \"int\"},{\"name\": \"src_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_nation\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_nation_code\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_region\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_port\", \"type\": \"int\"},{\"name\": \"dst_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"ip_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_nation\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_nation_code\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_region\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_port\", \"type\": \"int\"},{\"name\": \"method\", \"type\": [\"string\", \"null\"]},{\"name\": \"request_uri\", \"type\": [\"string\", \"null\"]},{\"name\": \"user_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"service_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"service_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_stat\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_stat_format\", \"type\": [\"string\", \"null\"]},{\"name\": \"from\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_tag\", \"type\": [\"string\", \"null\"]},{\"name\": \"to\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_tag\", \"type\": [\"string\", \"null\"]},{\"name\": \"cseq\", \"type\": [\"string\", \"null\"]},{\"name\": \"cseq_method\", \"type\": [\"string\", \"null\"]},{\"name\": \"user_agent\", \"type\": [\"string\", \"null\"]},{\"name\": \"device_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"max_forwards\", \"type\": [\"string\", \"null\"]},{\"name\": \"server\", \"type\": [\"string\", \"null\"]},{\"name\": \"server_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_via_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_record_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_expires\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_others\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_content_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_content\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_via_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_record_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_expires\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_others\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_content_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_content\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_coding\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_coding\", \"type\": [\"string\", \"null\"]},{\"name\": \"stat_time\", \"type\": \"int\"}]}";
- Schema schemaSip = new Schema.Parser().parse(schemaJsonSip);
- fileWriter = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schemaSip)).setSyncInterval(100);
- fileWriter.setCodec(CodecFactory.snappyCodec());
- fileWriter.create(schemaSip, outputStream_avro);
- for (String dataJson : data) {
- SipOriginALL sipOriginAllLog = JSONObject.parseObject(dataJson, SipOriginALL.class);
- sipOriginAllLog.setStat_time(time);
- GenericRecord recordSip = new GenericData.Record(schemaSip);
-
- recordSip.put("call_id", sipOriginAllLog.getCall_ID());
- recordSip.put("clj_ip", sipOriginAllLog.getCLJ_IP());
- recordSip.put("found_time", sipOriginAllLog.getFound_Time());
- recordSip.put("src_ip", sipOriginAllLog.getSRC_IP());
- recordSip.put("src_location_nation", sipOriginAllLog.getSRC_LOCATION_NATION());
- recordSip.put("src_location_nation_code", sipOriginAllLog.getSRC_LOCATION_NATION_CODE());
- recordSip.put("src_location_region", sipOriginAllLog.getSRC_LOCATION_REGION());
- recordSip.put("src_port", sipOriginAllLog.getSRC_PORT());
- recordSip.put("dst_ip", sipOriginAllLog.getDST_IP());
- recordSip.put("ip_type", sipOriginAllLog.getIP_TYPE());
- recordSip.put("dst_location_nation", sipOriginAllLog.getDST_LOCATION_NATION());
- recordSip.put("dst_location_nation_code", sipOriginAllLog.getDST_LOCATION_NATION_CODE());
- recordSip.put("dst_location_region", sipOriginAllLog.getDST_LOCATION_REGION());
- recordSip.put("dst_port", sipOriginAllLog.getDST_PORT());
- recordSip.put("method", sipOriginAllLog.getMethod());
- recordSip.put("request_uri", sipOriginAllLog.getRequest_URI());
- recordSip.put("user_name", sipOriginAllLog.getUser_name());
- recordSip.put("service_domain", sipOriginAllLog.getService_domain());
- recordSip.put("service_domain_valid", sipOriginAllLog.getService_domain_valid());
- recordSip.put("res_stat", sipOriginAllLog.getRes_stat());
- recordSip.put("res_stat_format", sipOriginAllLog.getRes_stat_format());
- recordSip.put("from", sipOriginAllLog.getFrom());
- recordSip.put("from_nickname", sipOriginAllLog.getFrom_Nickname());
- recordSip.put("from_usr_name", sipOriginAllLog.getFrom_usr_name());
- recordSip.put("from_ser_domain", sipOriginAllLog.getFrom_ser_domain());
- recordSip.put("from_ser_domain_valid", sipOriginAllLog.getFrom_ser_domain_valid());
- recordSip.put("from_tag", sipOriginAllLog.getFrom_tag());
- recordSip.put("to", sipOriginAllLog.getTo());
- recordSip.put("to_nickname", sipOriginAllLog.getTo_Nickname());
- recordSip.put("to_usr_name", sipOriginAllLog.getTo_usr_name());
- recordSip.put("to_ser_domain", sipOriginAllLog.getTo_ser_domain());
- recordSip.put("to_ser_domain_valid", sipOriginAllLog.getTo_ser_domain_valid());
- recordSip.put("to_tag", sipOriginAllLog.getTo_tag());
- recordSip.put("cseq", sipOriginAllLog.getCseq());
- recordSip.put("cseq_method", sipOriginAllLog.getCseq_method());
- recordSip.put("user_agent", sipOriginAllLog.getUser_Agent());
- recordSip.put("device_type", sipOriginAllLog.getDevice_type());
- recordSip.put("max_forwards", sipOriginAllLog.getMax_Forwards());
- recordSip.put("server", sipOriginAllLog.getServer());
- recordSip.put("server_type", sipOriginAllLog.getServer_type());
- recordSip.put("req_via_json", sipOriginAllLog.getReq_Via_Json());
- recordSip.put("req_contact", sipOriginAllLog.getReq_Contact());
- recordSip.put("req_contact_nickname", sipOriginAllLog.getReq_Contact_Nickname());
- recordSip.put("req_contact_usr_name", sipOriginAllLog.getReq_Contact_usr_name());
- recordSip.put("req_contact_ser_domain", sipOriginAllLog.getReq_Contact_ser_domain());
- recordSip.put("req_ser_domain_valid", sipOriginAllLog.getReq_ser_domain_valid());
- recordSip.put("req_record_route_json", sipOriginAllLog.getReq_Record_Route_Json());
- recordSip.put("req_route_json", sipOriginAllLog.getReq_Route_Json());
- recordSip.put("req_expires", sipOriginAllLog.getReq_Expires());
- recordSip.put("req_others", sipOriginAllLog.getReq_Others());
- recordSip.put("req_content_type", sipOriginAllLog.getReq_Content_Type());
- recordSip.put("req_content", sipOriginAllLog.getReq_Content());
- recordSip.put("res_via_json", sipOriginAllLog.getRes_Via_Json());
- recordSip.put("res_contact", sipOriginAllLog.getRes_Contact());
- recordSip.put("res_contact_nickname", sipOriginAllLog.getRes_Contact_Nickname());
- recordSip.put("res_contact_usr_name", sipOriginAllLog.getRes_Contact_usr_name());
- recordSip.put("res_contact_ser_domain", sipOriginAllLog.getRes_Contact_ser_domain());
- recordSip.put("res_ser_domain_valid", sipOriginAllLog.getRes_ser_domain_valid());
- recordSip.put("res_record_route_json", sipOriginAllLog.getRes_Record_Route_Json());
- recordSip.put("res_route_json", sipOriginAllLog.getRes_Route_Json());
- recordSip.put("res_expires", sipOriginAllLog.getRes_Expires());
- recordSip.put("res_others", sipOriginAllLog.getRes_Others());
- recordSip.put("res_content_type", sipOriginAllLog.getRes_Content_Type());
- recordSip.put("res_content", sipOriginAllLog.getRes_Content());
- recordSip.put("req_coding", sipOriginAllLog.getReq_coding());
- recordSip.put("res_coding", sipOriginAllLog.getRes_coding());
- recordSip.put("stat_time", sipOriginAllLog.getStat_time());
-
- fileWriter.append(recordSip);
- fileWriter.flush();
- }
- break;
- default:
- logger.error("HdfsDataLoad_Avro toHdfs logType is error !!!This logType is--->{" + logType + "}<---");
- break;
- }
-
- logger.warn("HdfsDataLoad_Avro data to HDFS Successful,hdfs_path is -->{" + hdfs_path + "}<---");
-
- if (fileWriter != null) {
- fileWriter.close();
- fileWriter = null;
- }
- if (outputStream_avro != null) {
- outputStream_avro.close();
- outputStream_avro = null;
- }
-
- switch (logType) {
- case "origin":
- String tablenameSip = RealtimeCountConfig.HIVE_SIP_CLEAN_TABLE;
- loadDataToHiveAvro(tablenameSip, partition, hdfs_path);
- break;
-// case "route":
-// String tablenameFrag = RealtimeCountConfig.HIVE_SIP_ROUTE_TABLE;
-// loadDataToHiveAvro(tablenameFrag, partition, hdfs_path);
-// break;
- default:
- logger.error("HdfsDataLoad_Avro toHive logType is error !!!This logType is--->{" + logType + "}<---");
- break;
- }
-
- } catch (Exception e) {
- logger.error("HdfsDataLoad_Avro dataToHdfs method is error !!!--->{" + e + "}<---");
- e.printStackTrace();
- }
- }
-
- public void loadDataToHiveAvro(String tablename, String partition, String hdfs_path) {
- String dataUrl = hdfs_path;
- StringBuffer sb = new StringBuffer();
- try {
- con = ds.getConnection();
- stmt = con.createStatement();
- sb.append("load data inpath ").append("'").append(dataUrl).append("'")
- .append(" into table ").append(tablename).append(" partition( time_partition=").append(partition).append(")");
- stmt.execute(sb.toString());
-
- logger.warn("HdfsDataLoad_Avro data to Hive Successful,dataUrl is -->{" + dataUrl + "}<---");
-
- } catch (Exception e) {
- logger.error("HdfsDataLoad_Avro loadDataToHive method is error !!!--->{" + e + "}<---");
- e.printStackTrace();
- } finally {
- try {
- if (stmt != null) {
- stmt.close();
- stmt = null;
- }
- if (con != null) {
- con.close();
- con = null;
- }
- } catch (Exception e) {
- logger.error("HdfsDataLoad_Avro loadDataToHive when close is error !!!--->{" + e + "}<---");
- e.printStackTrace();
- }
- }
- }
-
- private String strDefaultValue(String str) {
- return (StringUtil.isBlank(str)
- || str == null
- || str.equals("")
- || str.length() == 0) ? "$#$" : str;
- }
-}
diff --git a/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java b/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java
deleted file mode 100644
index 9223f92..0000000
--- a/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java
+++ /dev/null
@@ -1,187 +0,0 @@
-package cn.ac.iie.utils.HiveDao;
-
-import com.alibaba.druid.pool.DruidDataSource;
-import com.alibaba.fastjson.JSONException;
-import com.alibaba.fastjson.JSONObject;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import net.sf.json.JSONArray;
-import org.apache.log4j.Logger;
-
-import java.sql.*;
-import java.util.Properties;
-
-/**
- * Hive-JDBC连接池
- *
- * @author Colbert
- */
-public class HiveDataSourceUtil {
- private static DruidDataSource hiveDataSource = new DruidDataSource();
- public static Connection conn = null;
- private static final Logger logger = Logger.getLogger(HiveDataSourceUtil.class);
-
- public static DruidDataSource getHiveDataSource() {
- if (hiveDataSource.isInited()) {
- return hiveDataSource;
- }
-
- try {
- Properties dsProp = new Properties();
- dsProp.load(HiveDataSourceUtil.class.getClassLoader().getResourceAsStream("hive.properties"));
- hiveDataSource.setDriverClassName(dsProp.getProperty("hive_jdbc_drivers"));
- //基本属性 url、user、password
- hiveDataSource.setUrl(dsProp.getProperty("hive_jdbc_url"));
- hiveDataSource.setUsername(dsProp.getProperty("hive_jdbc_username"));
- hiveDataSource.setPassword(dsProp.getProperty("hive_jdbc_password"));
-
- //配置初始化大小、最小、最大
- hiveDataSource.setInitialSize(Integer.parseInt(dsProp.getProperty("hive_initialSize")));
- hiveDataSource.setMinIdle(Integer.parseInt(dsProp.getProperty("hive_minIdle")));
- hiveDataSource.setMaxActive(Integer.parseInt(dsProp.getProperty("hive_maxActive")));
-
- //配置获取连接等待超时的时间
- hiveDataSource.setMaxWait(Integer.parseInt(dsProp.getProperty("hive_maxWait")));
-
- //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
- hiveDataSource.setTimeBetweenEvictionRunsMillis(60000);
-
- //配置一个连接在池中最小生存的时间,单位是毫秒
- hiveDataSource.setMinEvictableIdleTimeMillis(300000);
-
- hiveDataSource.setValidationQuery("SELECT 1");
- hiveDataSource.setTestWhileIdle(true);
- hiveDataSource.setTestOnBorrow(true);
-// hiveDataSource.setKeepAlive(true);
-
- //打开PSCache,并且指定每个连接上PSCache的大小
- hiveDataSource.setPoolPreparedStatements(true);
- hiveDataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
-
- //配置监控统计拦截的filters
-// hiveDataSource.setFilters("stat");
-
- hiveDataSource.init();
- } catch (Exception e) {
- e.printStackTrace();
- closeHiveDataSource();
- }
- return hiveDataSource;
- }
-
- /**
- * @Description:关闭Hive连接池
- */
- public static void closeHiveDataSource() {
- if (hiveDataSource != null) {
- hiveDataSource.close();
- }
- }
-
- /**
- * @return
- * @Description:获取Hive连接
- */
- public static Connection getHiveConn() {
- try {
- hiveDataSource = getHiveDataSource();
- conn = hiveDataSource.getConnection();
- } catch (Exception e) {
- logger.error("HiveDataSourceUtil--" + e + ":获取Hive连接失败!");
- }
- return conn;
- }
-
- /**
- * @Description:关闭Hive数据连接
- */
- public static void closeConn() {
- try {
- if (conn != null) {
- conn.close();
- }
- } catch (Exception e) {
- logger.error("HiveDataSourceUtil--" + e + ":关闭Hive-conn连接失败!");
- }
- }
-
-
- public static void main(String[] args) throws Exception {
- DruidDataSource ds = HiveDataSourceUtil.getHiveDataSource();
- Connection conn = ds.getConnection();
- Statement stmt = null;
- if (conn == null) {
- System.out.println("null");
- } else {
- System.out.println("conn");
- stmt = conn.createStatement();
- ResultSet res = stmt.executeQuery("select * from test.frag_media_expire_log limit 10");
- int i = 0;
- while (res.next()) {
- if (i < 10) {
- System.out.println(res.getString(2));
- i++;
- }
- }
-// String s = resultSetToJson(res);
-// String s = ResultSetToJsonString(res);
-// System.out.println(s);
- }
-
- stmt.close();
- conn.close();
- }
-
- public static String resultSetToJson(ResultSet rs) throws SQLException, JSONException {
- // json数组
- JSONArray array = new JSONArray();
-
- // 获取列数
- ResultSetMetaData metaData = rs.getMetaData();
- int columnCount = metaData.getColumnCount();
-
- // 遍历ResultSet中的每条数据
- while (rs.next()) {
- JSONObject jsonObj = new JSONObject();
-
- // 遍历每一列
- for (int i = 1; i <= columnCount; i++) {
- String columnName = metaData.getColumnLabel(i);
- String value = rs.getString(columnName);
- jsonObj.put(columnName, value);
- }
-// array.put(jsonObj);
- array.add(jsonObj);
- }
-
- return array.toString();
- }
-
- public static final JsonObject ResultSetToJsonObject(ResultSet rs) {
- JsonObject element = null;
- JsonArray ja = new JsonArray();
- JsonObject jo = new JsonObject();
- ResultSetMetaData rsmd = null;
- String columnName, columnValue = null;
- try {
- rsmd = rs.getMetaData();
- while (rs.next()) {
- element = new JsonObject();
- for (int i = 0; i < rsmd.getColumnCount(); i++) {
- columnName = rsmd.getColumnName(i + 1);
- columnValue = rs.getString(columnName);
- element.addProperty(columnName, columnValue);
- }
- ja.add(element);
- }
- jo.add("result", ja);
- } catch (SQLException e) {
- e.printStackTrace();
- }
- return jo;
- }
-
- public static final String ResultSetToJsonString(ResultSet rs) {
- return ResultSetToJsonObject(rs).toString();
- }
-}