summaryrefslogtreecommitdiff
path: root/src/main/java/cn/ac/iie/utils
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/cn/ac/iie/utils')
-rw-r--r--src/main/java/cn/ac/iie/utils/CSVAlarm.java77
-rw-r--r--src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java240
-rw-r--r--src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java187
-rw-r--r--src/main/java/cn/ac/iie/utils/IPIPLibrary/Ipip.java189
-rw-r--r--src/main/java/cn/ac/iie/utils/RealtimeCountConfigurations.java67
-rw-r--r--src/main/java/cn/ac/iie/utils/TupleUtils.java13
-rw-r--r--src/main/java/cn/ac/iie/utils/dao/ClickHouseUtils.java47
-rw-r--r--src/main/java/cn/ac/iie/utils/getjson/GetJsonToKafkaUtils.java835
-rw-r--r--src/main/java/cn/ac/iie/utils/getjson/GetStrToClickHouseUtils.java585
-rw-r--r--src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java61
-rw-r--r--src/main/java/cn/ac/iie/utils/redis/RedisUrlPollUtils.java61
11 files changed, 2362 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/utils/CSVAlarm.java b/src/main/java/cn/ac/iie/utils/CSVAlarm.java
new file mode 100644
index 0000000..fd64b5f
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/CSVAlarm.java
@@ -0,0 +1,77 @@
+package cn.ac.iie.utils;
+
+import cn.ac.iie.common.HttpManager;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.util.EntityUtils;
+import org.apache.log4j.Logger;
+
+public class CSVAlarm {
+ //类初始化时,自动实例化,饿汉单例模式
+ private static final CSVAlarm csvAlarm = new CSVAlarm();
+ private static Logger logger = Logger.getLogger(CSVAlarm.class);
+
+ public static CSVAlarm getInfoLoadInstance(){
+ return csvAlarm;
+ }
+
+ //私有构造方法,防止外部构建
+ private CSVAlarm(){
+ }
+
+ public void csvDataLoad(String url, String topicNmane, String data) throws Exception {
+ // http client
+ DefaultHttpClient httpClient = new DefaultHttpClient();
+ String topic = topicNmane;
+ try {
+ // 需要发送的数据
+ String msg = data;//每条数据内部"\t"分割,数据间"\n"分割;//自己又换了分隔符:每条数据内部"#"分割,数据间"$"分割;
+ //开始结束时间
+// long start = System.currentTimeMillis();
+ HttpPost request = new HttpPost(url);
+ //用户名与密码,用于权限控制,传输时加密
+// request.addHeader("User", "LiMing");
+// request.addHeader("Password", "123");
+ //指定使用topic 自动绑定对应schema
+ request.addHeader("Topic", topic);
+ //Schema-Version可选,不填或为空默认使用最新版本的schema
+ request.addHeader("Schema-Version", "2");
+ //csv 或者 avro,大小写不敏感
+ request.addHeader("Format", "csv");
+ //行列分隔符默认为下值
+// request.addHeader("Row-Split", "\\n");
+// request.addHeader("Field-Split", "\\t");
+ request.addHeader("Row-Split", "\\n");
+ request.addHeader("Field-Split", ",");
+ StringEntity payload = new StringEntity(msg);
+ request.setEntity(payload);
+ HttpResponse response = httpClient.execute(request);
+ try {
+ int statuCode = response.getStatusLine().getStatusCode();
+ HttpEntity entity = response.getEntity();
+ if (statuCode == 200) {
+ logger.info("数据中心加载成功, 返回码: " + statuCode);
+ System.out.println("数据中心加载成功, 返回码: " + statuCode);
+ EntityUtils.consume(entity);
+ } else {
+ String ret = EntityUtils.toString(entity);
+ EntityUtils.consume(entity);
+ logger.info("数据中心加载失败: " + ret + " --- code: " + statuCode + " ---失败数据为: \n" + data);
+ System.out.println("数据中心加载失败: " + ret + " --- code: " + statuCode + " ---失败数据为: \n" + data);
+ logger.error("数据中心加载失败: " + ret + " --- code: " + statuCode);
+ System.out.println("数据中心加载失败: " + ret + " --- code: " + statuCode);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } catch (Exception ex) {
+ // handle exception
+ ex.printStackTrace();
+ } finally {
+ httpClient.getConnectionManager().shutdown(); //Deprecated
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java b/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java
new file mode 100644
index 0000000..152bc4d
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java
@@ -0,0 +1,240 @@
+package cn.ac.iie.utils.HiveDao;
+
+import cn.ac.iie.bean.voipSipOrigin.SipOriginALL;
+import cn.ac.iie.common.RealtimeCountConfig;
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.LinkedList;
+import java.util.UUID;
+
+/**
+ * HDFS-Hive-Avro格式载入类
+ *
+ * @author Colbert
+ */
+public class HdfsDataLoad_Avro {
+
+ private static Logger logger = Logger.getLogger(HdfsDataLoad_Avro.class);
+ private static DruidDataSource ds = HiveDataSourceUtil.getHiveDataSource();
+ private Connection con = null;
+ private Statement stmt = null;
+ private FSDataOutputStream outputStream_avro = null;
+
+ private DataFileWriter<GenericRecord> fileWriter = null;
+
+ private static HdfsDataLoad_Avro hdfsDataLoad_avro = null;
+ private static FileSystem fileSystem = null;
+
+ private HdfsDataLoad_Avro() {
+ getHdfsConnection();
+ }
+
+ public static HdfsDataLoad_Avro getHdfsInstance() {
+ if (hdfsDataLoad_avro == null || fileSystem == null) {
+ hdfsDataLoad_avro = new HdfsDataLoad_Avro();
+ }
+ return hdfsDataLoad_avro;
+ }
+
+ /**
+ * 创建HDFS连接
+ */
+ private void getHdfsConnection() {
+ try {
+ Configuration conf = new Configuration();
+ conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+ conf.setBoolean("dfs.support.append", true);
+ conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
+ conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
+ fileSystem = FileSystem.get(new URI(RealtimeCountConfig.HDFS_URL), conf, RealtimeCountConfig.HDFS_USER);
+ } catch (Exception e) {
+ logger.error("HdfsDataLoad_Avro getHdfsConnection method is error !!!--->{" + e + "}<---");
+ e.printStackTrace();
+ }
+ }
+
+
+ public void dataSipToHdfsAvro(String partition, LinkedList<String> data, String topicName, String logType, Long timeSend) {
+ long time = timeSend;
+ String uuid = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
+ String hdfs_path = RealtimeCountConfig.HDFS_URL + RealtimeCountConfig.HDFS_PATH + topicName.toLowerCase() + "/" + partition + "/" + topicName.toLowerCase() + "-" + uuid + ".avro";//格式 hdfs://ns1/input/frag-media-expire-log/20190730/frag-media-expire-log-d84772e8257048f3be1ca82f8e35f215.avro
+ try {
+ Path path = new Path(hdfs_path);
+ fileSystem.createNewFile(path);
+ outputStream_avro = fileSystem.append(path);
+
+ switch (logType) {
+ case "origin":
+ String schemaJsonSip = "{\"type\": \"record\",\"name\": \"siplog\",\"fields\": [{\"name\": \"call_id\", \"type\": [\"string\", \"null\"]},{\"name\": \"clj_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"found_time\", \"type\": \"int\"},{\"name\": \"src_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_nation\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_nation_code\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_region\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_port\", \"type\": \"int\"},{\"name\": \"dst_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"ip_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_nation\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_nation_code\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_region\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_port\", \"type\": \"int\"},{\"name\": \"method\", \"type\": [\"string\", \"null\"]},{\"name\": \"request_uri\", \"type\": [\"string\", \"null\"]},{\"name\": \"user_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"service_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"service_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_stat\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_stat_format\", \"type\": [\"string\", \"null\"]},{\"name\": \"from\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_tag\", \"type\": [\"string\", \"null\"]},{\"name\": \"to\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_tag\", \"type\": [\"string\", \"null\"]},{\"name\": \"cseq\", \"type\": [\"string\", \"null\"]},{\"name\": \"cseq_method\", \"type\": [\"string\", \"null\"]},{\"name\": \"user_agent\", \"type\": [\"string\", \"null\"]},{\"name\": \"device_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"max_forwards\", \"type\": [\"string\", \"null\"]},{\"name\": \"server\", \"type\": [\"string\", \"null\"]},{\"name\": \"server_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_via_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_record_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_expires\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_others\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_content_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_content\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_via_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_record_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_expires\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_others\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_content_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_content\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_coding\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_coding\", \"type\": [\"string\", \"null\"]},{\"name\": \"stat_time\", \"type\": \"int\"}]}";
+ Schema schemaSip = new Schema.Parser().parse(schemaJsonSip);
+ fileWriter = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schemaSip)).setSyncInterval(100);
+ fileWriter.setCodec(CodecFactory.snappyCodec());
+ fileWriter.create(schemaSip, outputStream_avro);
+ for (String dataJson : data) {
+ SipOriginALL sipOriginAllLog = JSONObject.parseObject(dataJson, SipOriginALL.class);
+ sipOriginAllLog.setStat_time(time);
+ GenericRecord recordSip = new GenericData.Record(schemaSip);
+
+ recordSip.put("call_id", sipOriginAllLog.getCall_ID());
+ recordSip.put("clj_ip", sipOriginAllLog.getCLJ_IP());
+ recordSip.put("found_time", sipOriginAllLog.getFound_Time());
+ recordSip.put("src_ip", sipOriginAllLog.getSRC_IP());
+ recordSip.put("src_location_nation", sipOriginAllLog.getSRC_LOCATION_NATION());
+ recordSip.put("src_location_nation_code", sipOriginAllLog.getSRC_LOCATION_NATION_CODE());
+ recordSip.put("src_location_region", sipOriginAllLog.getSRC_LOCATION_REGION());
+ recordSip.put("src_port", sipOriginAllLog.getSRC_PORT());
+ recordSip.put("dst_ip", sipOriginAllLog.getDST_IP());
+ recordSip.put("ip_type", sipOriginAllLog.getIP_TYPE());
+ recordSip.put("dst_location_nation", sipOriginAllLog.getDST_LOCATION_NATION());
+ recordSip.put("dst_location_nation_code", sipOriginAllLog.getDST_LOCATION_NATION_CODE());
+ recordSip.put("dst_location_region", sipOriginAllLog.getDST_LOCATION_REGION());
+ recordSip.put("dst_port", sipOriginAllLog.getDST_PORT());
+ recordSip.put("method", sipOriginAllLog.getMethod());
+ recordSip.put("request_uri", sipOriginAllLog.getRequest_URI());
+ recordSip.put("user_name", sipOriginAllLog.getUser_name());
+ recordSip.put("service_domain", sipOriginAllLog.getService_domain());
+ recordSip.put("service_domain_valid", sipOriginAllLog.getService_domain_valid());
+ recordSip.put("res_stat", sipOriginAllLog.getRes_stat());
+ recordSip.put("res_stat_format", sipOriginAllLog.getRes_stat_format());
+ recordSip.put("from", sipOriginAllLog.getFrom());
+ recordSip.put("from_nickname", sipOriginAllLog.getFrom_Nickname());
+ recordSip.put("from_usr_name", sipOriginAllLog.getFrom_usr_name());
+ recordSip.put("from_ser_domain", sipOriginAllLog.getFrom_ser_domain());
+ recordSip.put("from_ser_domain_valid", sipOriginAllLog.getFrom_ser_domain_valid());
+ recordSip.put("from_tag", sipOriginAllLog.getFrom_tag());
+ recordSip.put("to", sipOriginAllLog.getTo());
+ recordSip.put("to_nickname", sipOriginAllLog.getTo_Nickname());
+ recordSip.put("to_usr_name", sipOriginAllLog.getTo_usr_name());
+ recordSip.put("to_ser_domain", sipOriginAllLog.getTo_ser_domain());
+ recordSip.put("to_ser_domain_valid", sipOriginAllLog.getTo_ser_domain_valid());
+ recordSip.put("to_tag", sipOriginAllLog.getTo_tag());
+ recordSip.put("cseq", sipOriginAllLog.getCseq());
+ recordSip.put("cseq_method", sipOriginAllLog.getCseq_method());
+ recordSip.put("user_agent", sipOriginAllLog.getUser_Agent());
+ recordSip.put("device_type", sipOriginAllLog.getDevice_type());
+ recordSip.put("max_forwards", sipOriginAllLog.getMax_Forwards());
+ recordSip.put("server", sipOriginAllLog.getServer());
+ recordSip.put("server_type", sipOriginAllLog.getServer_type());
+ recordSip.put("req_via_json", sipOriginAllLog.getReq_Via_Json());
+ recordSip.put("req_contact", sipOriginAllLog.getReq_Contact());
+ recordSip.put("req_contact_nickname", sipOriginAllLog.getReq_Contact_Nickname());
+ recordSip.put("req_contact_usr_name", sipOriginAllLog.getReq_Contact_usr_name());
+ recordSip.put("req_contact_ser_domain", sipOriginAllLog.getReq_Contact_ser_domain());
+ recordSip.put("req_ser_domain_valid", sipOriginAllLog.getReq_ser_domain_valid());
+ recordSip.put("req_record_route_json", sipOriginAllLog.getReq_Record_Route_Json());
+ recordSip.put("req_route_json", sipOriginAllLog.getReq_Route_Json());
+ recordSip.put("req_expires", sipOriginAllLog.getReq_Expires());
+ recordSip.put("req_others", sipOriginAllLog.getReq_Others());
+ recordSip.put("req_content_type", sipOriginAllLog.getReq_Content_Type());
+ recordSip.put("req_content", sipOriginAllLog.getReq_Content());
+ recordSip.put("res_via_json", sipOriginAllLog.getRes_Via_Json());
+ recordSip.put("res_contact", sipOriginAllLog.getRes_Contact());
+ recordSip.put("res_contact_nickname", sipOriginAllLog.getRes_Contact_Nickname());
+ recordSip.put("res_contact_usr_name", sipOriginAllLog.getRes_Contact_usr_name());
+ recordSip.put("res_contact_ser_domain", sipOriginAllLog.getRes_Contact_ser_domain());
+ recordSip.put("res_ser_domain_valid", sipOriginAllLog.getRes_ser_domain_valid());
+ recordSip.put("res_record_route_json", sipOriginAllLog.getRes_Record_Route_Json());
+ recordSip.put("res_route_json", sipOriginAllLog.getRes_Route_Json());
+ recordSip.put("res_expires", sipOriginAllLog.getRes_Expires());
+ recordSip.put("res_others", sipOriginAllLog.getRes_Others());
+ recordSip.put("res_content_type", sipOriginAllLog.getRes_Content_Type());
+ recordSip.put("res_content", sipOriginAllLog.getRes_Content());
+ recordSip.put("req_coding", sipOriginAllLog.getReq_coding());
+ recordSip.put("res_coding", sipOriginAllLog.getRes_coding());
+ recordSip.put("stat_time", sipOriginAllLog.getStat_time());
+
+ fileWriter.append(recordSip);
+ fileWriter.flush();
+ }
+ break;
+ default:
+ logger.error("HdfsDataLoad_Avro toHdfs logType is error !!!This logType is--->{" + logType + "}<---");
+ break;
+ }
+
+ logger.warn("HdfsDataLoad_Avro data to HDFS Successful,hdfs_path is -->{" + hdfs_path + "}<---");
+
+ if (fileWriter != null) {
+ fileWriter.close();
+ fileWriter = null;
+ }
+ if (outputStream_avro != null) {
+ outputStream_avro.close();
+ outputStream_avro = null;
+ }
+
+ switch (logType) {
+ case "origin":
+ String tablenameSip = RealtimeCountConfig.HIVE_SIP_CLEAN_TABLE;
+ loadDataToHiveAvro(tablenameSip, partition, hdfs_path);
+ break;
+// case "route":
+// String tablenameFrag = RealtimeCountConfig.HIVE_SIP_ROUTE_TABLE;
+// loadDataToHiveAvro(tablenameFrag, partition, hdfs_path);
+// break;
+ default:
+ logger.error("HdfsDataLoad_Avro toHive logType is error !!!This logType is--->{" + logType + "}<---");
+ break;
+ }
+
+ } catch (Exception e) {
+ logger.error("HdfsDataLoad_Avro dataToHdfs method is error !!!--->{" + e + "}<---");
+ e.printStackTrace();
+ }
+ }
+
+ public void loadDataToHiveAvro(String tablename, String partition, String hdfs_path) {
+ String dataUrl = hdfs_path;
+ StringBuffer sb = new StringBuffer();
+ try {
+ con = ds.getConnection();
+ stmt = con.createStatement();
+ sb.append("load data inpath ").append("'").append(dataUrl).append("'")
+ .append(" into table ").append(tablename).append(" partition( time_partition=").append(partition).append(")");
+ stmt.execute(sb.toString());
+
+ logger.warn("HdfsDataLoad_Avro data to Hive Successful,dataUrl is -->{" + dataUrl + "}<---");
+
+ } catch (Exception e) {
+ logger.error("HdfsDataLoad_Avro loadDataToHive method is error !!!--->{" + e + "}<---");
+ e.printStackTrace();
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ stmt = null;
+ }
+ if (con != null) {
+ con.close();
+ con = null;
+ }
+ } catch (Exception e) {
+ logger.error("HdfsDataLoad_Avro loadDataToHive when close is error !!!--->{" + e + "}<---");
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private String strDefaultValue(String str) {
+ return (StringUtil.isBlank(str)
+ || str == null
+ || str.equals("")
+ || str.length() == 0) ? "$#$" : str;
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java b/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java
new file mode 100644
index 0000000..9223f92
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java
@@ -0,0 +1,187 @@
+package cn.ac.iie.utils.HiveDao;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.alibaba.fastjson.JSONException;
+import com.alibaba.fastjson.JSONObject;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import net.sf.json.JSONArray;
+import org.apache.log4j.Logger;
+
+import java.sql.*;
+import java.util.Properties;
+
+/**
+ * Hive-JDBC连接池
+ *
+ * @author Colbert
+ */
+public class HiveDataSourceUtil {
+ private static DruidDataSource hiveDataSource = new DruidDataSource();
+ public static Connection conn = null;
+ private static final Logger logger = Logger.getLogger(HiveDataSourceUtil.class);
+
+ public static DruidDataSource getHiveDataSource() {
+ if (hiveDataSource.isInited()) {
+ return hiveDataSource;
+ }
+
+ try {
+ Properties dsProp = new Properties();
+ dsProp.load(HiveDataSourceUtil.class.getClassLoader().getResourceAsStream("hive.properties"));
+ hiveDataSource.setDriverClassName(dsProp.getProperty("hive_jdbc_drivers"));
+ //基本属性 url、user、password
+ hiveDataSource.setUrl(dsProp.getProperty("hive_jdbc_url"));
+ hiveDataSource.setUsername(dsProp.getProperty("hive_jdbc_username"));
+ hiveDataSource.setPassword(dsProp.getProperty("hive_jdbc_password"));
+
+ //配置初始化大小、最小、最大
+ hiveDataSource.setInitialSize(Integer.parseInt(dsProp.getProperty("hive_initialSize")));
+ hiveDataSource.setMinIdle(Integer.parseInt(dsProp.getProperty("hive_minIdle")));
+ hiveDataSource.setMaxActive(Integer.parseInt(dsProp.getProperty("hive_maxActive")));
+
+ //配置获取连接等待超时的时间
+ hiveDataSource.setMaxWait(Integer.parseInt(dsProp.getProperty("hive_maxWait")));
+
+ //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+ hiveDataSource.setTimeBetweenEvictionRunsMillis(60000);
+
+ //配置一个连接在池中最小生存的时间,单位是毫秒
+ hiveDataSource.setMinEvictableIdleTimeMillis(300000);
+
+ hiveDataSource.setValidationQuery("SELECT 1");
+ hiveDataSource.setTestWhileIdle(true);
+ hiveDataSource.setTestOnBorrow(true);
+// hiveDataSource.setKeepAlive(true);
+
+ //打开PSCache,并且指定每个连接上PSCache的大小
+ hiveDataSource.setPoolPreparedStatements(true);
+ hiveDataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
+
+ //配置监控统计拦截的filters
+// hiveDataSource.setFilters("stat");
+
+ hiveDataSource.init();
+ } catch (Exception e) {
+ e.printStackTrace();
+ closeHiveDataSource();
+ }
+ return hiveDataSource;
+ }
+
+ /**
+ * @Description:关闭Hive连接池
+ */
+ public static void closeHiveDataSource() {
+ if (hiveDataSource != null) {
+ hiveDataSource.close();
+ }
+ }
+
+ /**
+ * @return
+ * @Description:获取Hive连接
+ */
+ public static Connection getHiveConn() {
+ try {
+ hiveDataSource = getHiveDataSource();
+ conn = hiveDataSource.getConnection();
+ } catch (Exception e) {
+ logger.error("HiveDataSourceUtil--" + e + ":获取Hive连接失败!");
+ }
+ return conn;
+ }
+
+ /**
+ * @Description:关闭Hive数据连接
+ */
+ public static void closeConn() {
+ try {
+ if (conn != null) {
+ conn.close();
+ }
+ } catch (Exception e) {
+ logger.error("HiveDataSourceUtil--" + e + ":关闭Hive-conn连接失败!");
+ }
+ }
+
+
+ public static void main(String[] args) throws Exception {
+ DruidDataSource ds = HiveDataSourceUtil.getHiveDataSource();
+ Connection conn = ds.getConnection();
+ Statement stmt = null;
+ if (conn == null) {
+ System.out.println("null");
+ } else {
+ System.out.println("conn");
+ stmt = conn.createStatement();
+ ResultSet res = stmt.executeQuery("select * from test.frag_media_expire_log limit 10");
+ int i = 0;
+ while (res.next()) {
+ if (i < 10) {
+ System.out.println(res.getString(2));
+ i++;
+ }
+ }
+// String s = resultSetToJson(res);
+// String s = ResultSetToJsonString(res);
+// System.out.println(s);
+ }
+
+ stmt.close();
+ conn.close();
+ }
+
+ public static String resultSetToJson(ResultSet rs) throws SQLException, JSONException {
+ // json数组
+ JSONArray array = new JSONArray();
+
+ // 获取列数
+ ResultSetMetaData metaData = rs.getMetaData();
+ int columnCount = metaData.getColumnCount();
+
+ // 遍历ResultSet中的每条数据
+ while (rs.next()) {
+ JSONObject jsonObj = new JSONObject();
+
+ // 遍历每一列
+ for (int i = 1; i <= columnCount; i++) {
+ String columnName = metaData.getColumnLabel(i);
+ String value = rs.getString(columnName);
+ jsonObj.put(columnName, value);
+ }
+// array.put(jsonObj);
+ array.add(jsonObj);
+ }
+
+ return array.toString();
+ }
+
+ public static final JsonObject ResultSetToJsonObject(ResultSet rs) {
+ JsonObject element = null;
+ JsonArray ja = new JsonArray();
+ JsonObject jo = new JsonObject();
+ ResultSetMetaData rsmd = null;
+ String columnName, columnValue = null;
+ try {
+ rsmd = rs.getMetaData();
+ while (rs.next()) {
+ element = new JsonObject();
+ for (int i = 0; i < rsmd.getColumnCount(); i++) {
+ columnName = rsmd.getColumnName(i + 1);
+ columnValue = rs.getString(columnName);
+ element.addProperty(columnName, columnValue);
+ }
+ ja.add(element);
+ }
+ jo.add("result", ja);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ return jo;
+ }
+
+ public static final String ResultSetToJsonString(ResultSet rs) {
+ return ResultSetToJsonObject(rs).toString();
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/IPIPLibrary/Ipip.java b/src/main/java/cn/ac/iie/utils/IPIPLibrary/Ipip.java
new file mode 100644
index 0000000..ec1febe
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/IPIPLibrary/Ipip.java
@@ -0,0 +1,189 @@
+package cn.ac.iie.utils.IPIPLibrary;
+
+/**
+ * Created by lizhao on 2017-9-15.
+ */
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.nio.charset.Charset;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class Ipip {
+
+ public static String randomIp() {
+ Random r = new Random();
+ StringBuffer str = new StringBuffer();
+ str.append(r.nextInt(1000000) % 255);
+ str.append(".");
+ str.append(r.nextInt(1000000) % 255);
+ str.append(".");
+ str.append(r.nextInt(1000000) % 255);
+ str.append(".");
+ str.append(0);
+
+ return str.toString();
+ }
+
+ public static void main(String[] args){
+ Ipip.load("E:\\ipip.dat");
+
+ Long st = System.nanoTime();
+ for (int i = 0; i < 100; i++)
+ {
+ //IP.find(randomIp());
+ System.out.println(Arrays.toString(Ipip.find(randomIp())));
+ }
+ Long et = System.nanoTime();
+ System.out.println((et - st) / 1000 / 1000);
+
+// System.out.println(Arrays.toString(IP.find("118.28.8.8")));
+ }
+
+ public static boolean enableFileWatch = false;
+
+ private static int offset;
+ private static int[] index = new int[256];
+ private static ByteBuffer dataBuffer;
+ private static ByteBuffer indexBuffer;
+ private static Long lastModifyTime = 0L;
+ private static File ipFile ;
+ private static ReentrantLock lock = new ReentrantLock();
+
+ public static void load(String filename) {
+ ipFile = new File(filename);
+ load();
+ if (enableFileWatch) {
+ watch();
+ }
+ }
+
+ public static void load(String filename, boolean strict) throws Exception {
+ ipFile = new File(filename);
+ if (strict) {
+ int contentLength = Long.valueOf(ipFile.length()).intValue();
+ if (contentLength < 512 * 1024) {
+ throw new Exception("ip data file error.");
+ }
+ }
+ load();
+ if (enableFileWatch) {
+ watch();
+ }
+ }
+
+ public static String[] find(String ip) {
+ int ip_prefix_value = new Integer(ip.substring(0, ip.indexOf(".")));
+ long ip2long_value = ip2long(ip);
+ int start = index[ip_prefix_value];
+ int max_comp_len = offset - 1028;
+ long index_offset = -1;
+ int index_length = -1;
+ byte b = 0;
+ for (start = start * 8 + 1024; start < max_comp_len; start += 8) {
+ if (int2long(indexBuffer.getInt(start)) >= ip2long_value) {
+ index_offset = bytesToLong(b, indexBuffer.get(start + 6), indexBuffer.get(start + 5), indexBuffer.get(start + 4));
+ index_length = 0xFF & indexBuffer.get(start + 7);
+ break;
+ }
+ }
+
+ byte[] areaBytes;
+
+ lock.lock();
+ try {
+ dataBuffer.position(offset + (int) index_offset - 1024);
+ areaBytes = new byte[index_length];
+ dataBuffer.get(areaBytes, 0, index_length);
+ } finally {
+ lock.unlock();
+ }
+
+ return new String(areaBytes, Charset.forName("UTF-8")).split("\t", -1);
+ }
+
+ private static void watch() {
+ Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ long time = ipFile.lastModified();
+ if (time > lastModifyTime) {
+ lastModifyTime = time;
+ load();
+ }
+ }
+ }, 1000L, 5000L, TimeUnit.MILLISECONDS);
+ }
+
+ private static void load() {
+ lastModifyTime = ipFile.lastModified();
+ FileInputStream fin = null;
+ lock.lock();
+ try {
+ dataBuffer = ByteBuffer.allocate(Long.valueOf(ipFile.length()).intValue());
+ fin = new FileInputStream(ipFile);
+ int readBytesLength;
+ byte[] chunk = new byte[4096];
+ while (fin.available() > 0) {
+ readBytesLength = fin.read(chunk);
+ dataBuffer.put(chunk, 0, readBytesLength);
+ }
+ dataBuffer.position(0);
+ int indexLength = dataBuffer.getInt();
+ byte[] indexBytes = new byte[indexLength];
+ dataBuffer.get(indexBytes, 0, indexLength - 4);
+ indexBuffer = ByteBuffer.wrap(indexBytes);
+ indexBuffer.order(ByteOrder.LITTLE_ENDIAN);
+ offset = indexLength;
+
+ int loop = 0;
+ while (loop++ < 256) {
+ index[loop - 1] = indexBuffer.getInt();
+ }
+ indexBuffer.order(ByteOrder.BIG_ENDIAN);
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ } finally {
+ try {
+ if (fin != null) {
+ fin.close();
+ }
+ } catch (IOException e){
+ e.printStackTrace();
+ }
+ lock.unlock();
+ }
+ }
+
+ private static long bytesToLong(byte a, byte b, byte c, byte d) {
+ return int2long((((a & 0xff) << 24) | ((b & 0xff) << 16) | ((c & 0xff) << 8) | (d & 0xff)));
+ }
+
+ private static int str2Ip(String ip) {
+ String[] ss = ip.split("\\.");
+ int a, b, c, d;
+ a = Integer.parseInt(ss[0]);
+ b = Integer.parseInt(ss[1]);
+ c = Integer.parseInt(ss[2]);
+ d = Integer.parseInt(ss[3]);
+ return (a << 24) | (b << 16) | (c << 8) | d;
+ }
+
+ private static long ip2long(String ip) {
+ return int2long(str2Ip(ip));
+ }
+
+ private static long int2long(int i) {
+ long l = i & 0x7fffffffL;
+ if (i < 0) {
+ l |= 0x080000000L;
+ }
+ return l;
+ }
+} \ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/utils/RealtimeCountConfigurations.java b/src/main/java/cn/ac/iie/utils/RealtimeCountConfigurations.java
new file mode 100644
index 0000000..2a21798
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/RealtimeCountConfigurations.java
@@ -0,0 +1,67 @@
+package cn.ac.iie.utils;
+
+import java.util.Properties;
+
+//import com.nis.util.StringUtil;
+public final class RealtimeCountConfigurations {
+
+ private static Properties propCommon = new Properties();//0
+ private static Properties propService = new Properties();//1
+
+ public static String getStringProperty(Integer type, String key) {
+ if(type == 0){
+ return propCommon.getProperty(key);
+ } else if(type == 1){
+ return propService.getProperty(key);
+ } else {
+ return null;
+ }
+ }
+
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if(type == 0){
+ return Integer.parseInt(propCommon.getProperty(key));
+ } else if(type == 1){
+ return Integer.parseInt(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if(type == 0){
+ return Long.parseLong(propCommon.getProperty(key));
+ } else if(type == 1){
+ return Long.parseLong(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if(type == 0){
+ return propCommon.getProperty(key).toLowerCase().trim().equals("true");
+ } else if(type == 1){
+ return propService.getProperty(key).toLowerCase().trim().equals("true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propCommon.load(RealtimeCountConfigurations.class.getClassLoader().getResourceAsStream("realtime_config.properties"));//0
+ propService.load(RealtimeCountConfigurations.class.getClassLoader().getResourceAsStream("storm_config.properties"));//1
+ /*prop.load(new FileInputStream(System.getProperty("user.dir")
+ + File.separator + "config"+File.separator + "config.properties"));*/
+ System.out.println("realtime_config.properties加载成功");
+ System.out.println("storm_config.properties加载成功");
+
+ } catch (Exception e) {
+ propCommon = null;
+ propService = null;
+ System.err.println("RealtimeCountConfigurations配置文件加载失败");
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/TupleUtils.java b/src/main/java/cn/ac/iie/utils/TupleUtils.java
new file mode 100644
index 0000000..1a5889a
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/TupleUtils.java
@@ -0,0 +1,13 @@
+package cn.ac.iie.utils;
+
+import org.apache.storm.Constants;
+import org.apache.storm.tuple.Tuple;
+
+public final class TupleUtils {
+ //判断是否系统自动发送的Tuple
+ public static boolean isTick(Tuple tuple) {
+ return tuple != null
+ && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
+ && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+ }
+}
diff --git a/src/main/java/cn/ac/iie/utils/dao/ClickHouseUtils.java b/src/main/java/cn/ac/iie/utils/dao/ClickHouseUtils.java
new file mode 100644
index 0000000..3dced6d
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/dao/ClickHouseUtils.java
@@ -0,0 +1,47 @@
+package cn.ac.iie.utils.dao;
+
+
+import com.zdjizhi.utils.StringUtil;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+/**
+ * ClickHouse 入库类型转换类
+ *
+ * @author Administrator
+ */
+public class ClickHouseUtils {
+
+ public static void setInt(PreparedStatement pstms, int index, String str) {
+ try {
+ int num = 0;
+ if (str != null) {
+ num = Integer.parseInt(str);
+ }
+ pstms.setInt(index, num);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void setString(PreparedStatement pstmts, int index, String str) throws Exception {
+ if (StringUtil.isNotBlank(str)) {
+ pstmts.setString(index, str);
+ } else {
+ str = "";
+ pstmts.setString(index, str);
+ }
+ }
+
+ public static void setTimeStamp(PreparedStatement pstmts, int index, String str) throws Exception {
+ pstmts.setTimestamp(index, new Timestamp(Long.parseLong(str + "000")));
+ }
+
+ public static void setLong(PreparedStatement pstmts, int index, String str) throws Exception {
+ pstmts.setLong(index, Long.parseLong(str));
+ }
+
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/getjson/GetJsonToKafkaUtils.java b/src/main/java/cn/ac/iie/utils/getjson/GetJsonToKafkaUtils.java
new file mode 100644
index 0000000..a73aa7d
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/getjson/GetJsonToKafkaUtils.java
@@ -0,0 +1,835 @@
+//package cn.ac.iie.utils.getjson;
+//
+//import cn.ac.iie.bean.dk.DK_BEHAVIOR_LOG;
+//import cn.ac.iie.bean.mm.MM_AV_IP_LOG;
+//import cn.ac.iie.bean.mm.MM_VOIP_IP_LOG;
+//import cn.ac.iie.bean.ntc.NTC_CONN_RECORD_LOG;
+//import cn.ac.iie.bean.pxy.PXY_HTTP_LOG;
+//import cn.ac.iie.common.FlowWriteConfig;
+//import cn.ac.iie.common.RealtimeCountConfig;
+//import cn.ac.iie.utils.ordinary.DecodeUtils;
+//import cn.ac.iie.utils.ordinary.MD5Utils;
+//import cn.ac.iie.utils.ordinary.TransFormUtils;
+//import com.alibaba.fastjson.JSONObject;
+//import com.alibaba.fastjson.parser.Feature;
+//import com.zdjizhi.utils.IpLookup;
+//import com.zdjizhi.utils.StringUtil;
+//import org.apache.log4j.Logger;
+//
+///**
+// * @author antlee
+// * @date 2018/7/19
+// */
+//public class GetJsonToKafkaUtils {
+// private static Logger logger = Logger.getLogger(GetJsonToKafkaUtils.class);
+// private static IpLookup ipLookup = new IpLookup.Builder(false)
+// .loadDataFileV4(RealtimeCountConfig.IP_LIBRARY)
+// .loadDataFileV6(RealtimeCountConfig.IP_LIBRARY)
+// .build();
+//
+//
+// /**
+// * NTC topic对准类
+// *
+// * @param message 日志
+// * @param topic topic名称
+// * @return 补全日志
+// */
+// public static String getNTCData(String message, String topic) {
+// switch (topic) {
+// case "NTC-HTTP-LOG":
+// return httpReplenish(message);
+// case "NTC-MAIL-LOG":
+// return mailReplenish(message);
+// case "NTC-IP-LOG":
+// return ipReplenish(message);
+// case "NTC-APP-LOG":
+// return appReplenish(message);
+// case "NTC-SSL-LOG":
+// return sslReplenish(message);
+// case "NTC-DDOS-LOG":
+// return ddosReplenish(message);
+// case "NTC-DNS-LOG":
+// return dnsReplenish(message);
+// case "NTC-COLLECT-MAIL-LOG":
+// return mailReplenish(message);
+// case "NTC-OPENVPN-LOG":
+// return openVpnLog(message);
+// case "NTC-P2P-LOG":
+// return p2pReplenish(message);
+// case "PXY-HTTP-LOG":
+// return pxyHttpReplenish(message);
+// case "NTC-BGP-LOG":
+// return bgpReplenish(message);
+// case "NTC-FTP-LOG":
+// return ftpReplenish(message);
+// case "NTC-STREAMING-MEDIA-LOG":
+// return streamMediaReplenish(message);
+// case "NTC-VOIP-LOG":
+// return voipReplenish(message);
+//
+//// case "NTC-COLLECT-HTTP-LOG":
+//// return message;
+//// case "NTC-CONN-RECORD-LOG":
+//// return message;
+//// case "NTC-COLLECT-RADIUS-LOG":
+//// return message;
+//// case "NTC-KEYWORDS-URL-LOG":
+//// return message;
+//
+//// case "NTC-IPSEC-LOG":
+//// return ipsecLog(message);
+//// case "NTC-L2TP-LOG":
+//// return l2tpLog(message);
+//// case "NTC-SSH-LOG":
+//// return sshLog(message);
+//// case "NTC-PPTP-LOG":
+//// return pptpReplenish(message);
+// default:
+// logger.error("There is no corresponding topic! topic name is :" + topic);
+// break;
+// }
+// return null;
+// }
+//
+//
+// /**
+// * HTTP Log Replenish
+// *
+// * @param message
+// * @return
+// */
+// private static String httpReplenish(String message) {
+// try {
+// NTC_HTTP_LOG ntcHttpLog = JSONObject.parseObject(message, NTC_HTTP_LOG.class);
+// String sIp = ntcHttpLog.getS_ip();
+// String dIp = ntcHttpLog.getD_ip();
+// ntcHttpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcHttpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcHttpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcHttpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+// ntcHttpLog.setWebsite(StringUtil.getDomain(ntcHttpLog.getUrl()));
+//// ntcHttpLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP));
+//// ntcHttpLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp));
+// TransFormUtils.setHTTPFile(ntcHttpLog);
+// return JSONObject.toJSONString(ntcHttpLog);
+// } catch (Exception e) {
+// logger.error(("HTTP 补全日志出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+//
+// /**
+// * HTTP 文件方法
+// *
+// * @param message
+// * @return
+// */
+//
+// public static String httpLogFile(String message) {
+// try {
+// NTC_HTTP_LOG ntcHttpLog = JSONObject.parseObject(message, NTC_HTTP_LOG.class);
+// int cfgId = ntcHttpLog.getCfg_id();
+// if (StringUtil.isBlank(ntcHttpLog.getUrl())) {
+// TransFormUtils.getUniFlow(ntcHttpLog);
+// }
+// String url = ntcHttpLog.getUrl();
+// String reqBody = ntcHttpLog.getReq_body_file();
+// if (StringUtil.isNotBlank(reqBody)) {
+// ntcHttpLog.setReq_body_key(MD5Utils.md5Encode(cfgId + url + ntcHttpLog.getReq_body_file() + ntcHttpLog.getS_ip() + ntcHttpLog.getFound_time()));
+// }
+// ntcHttpLog.setRes_body_key(TransFormUtils.getHttpKey(cfgId, url, ntcHttpLog.getRes_body_file()));
+// ntcHttpLog.setReq_hdr_key(TransFormUtils.getHttpKey(cfgId, url, ntcHttpLog.getReq_hdr_file()));
+// ntcHttpLog.setRes_hdr_key(TransFormUtils.getHttpKey(cfgId, url, ntcHttpLog.getRes_hdr_file()));
+// return JSONObject.toJSONString(ntcHttpLog);
+// } catch (Exception e) {
+// logger.error(("HTPP 文件补全出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * IP日志不全操作方法
+// *
+// * @param message
+// * @return
+// */
+// private static String ipReplenish(String message) {
+// try {
+// NTC_IP_LOG ntcIpLog = JSONObject.parseObject(message, NTC_IP_LOG.class);
+// String sIp = ntcIpLog.getS_ip();
+// String dIp = ntcIpLog.getD_ip();
+// ntcIpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcIpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcIpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp));
+//// ntcIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP));
+// return JSONObject.toJSONString(ntcIpLog);
+// } catch (Exception e) {
+// logger.error(("IP 日志补全操作出现异常!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * MAIL文件传输方法
+// *
+// * @param message
+// * @return
+// */
+// public static String emailFile(String message) {
+// try {
+// NTC_MAIL_LOG ntcMailLog = JSONObject.parseObject(message, NTC_MAIL_LOG.class);
+// ntcMailLog.setEml_key(TransFormUtils.getEmlKey(ntcMailLog.getFound_time(), ntcMailLog.getMail_from(),
+// ntcMailLog.getMail_to(), ntcMailLog.getSubject(), ntcMailLog.getEml_file()));
+// if (StringUtil.isNotBlank(ntcMailLog.getSubject())) {
+// String subjectCharset = JSONObject.parseObject(message).getString("subject_charset");
+// ntcMailLog.setSubject(DecodeUtils.base64Str(ntcMailLog.getSubject(), subjectCharset));
+// }
+// return JSONObject.toJSONString(ntcMailLog);
+// } catch (Exception e) {
+// logger.error(("MAIL 文件传输出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * MAIL 日志详细信息补全方法
+// *
+// * @param message
+// * @return
+// */
+// private static String mailReplenish(String message) {
+// try {
+// NTC_MAIL_LOG ntcMailLog = JSONObject.parseObject(message, NTC_MAIL_LOG.class);
+// String sIp = ntcMailLog.getS_ip();
+// String dIp = ntcMailLog.getD_ip();
+// ntcMailLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcMailLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcMailLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcMailLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcMailLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP));
+//// ntcMailLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp));
+// TransFormUtils.setMailFile(ntcMailLog);
+// return JSONObject.toJSONString(ntcMailLog);
+// } catch (Exception e) {
+// logger.error(("MAIL 日志补全出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * APP Log Replenish.
+// *
+// * @param message
+// * @return
+// */
+// private static String appReplenish(String message) {
+// try {
+// NTC_APP_LOG ntcAppLog = JSONObject.parseObject(message, NTC_APP_LOG.class);
+// String sIp = ntcAppLog.getS_ip();
+// String dIp = ntcAppLog.getD_ip();
+// ntcAppLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcAppLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcAppLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcAppLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcAppLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP));
+//// ntcAppLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp));
+// return JSONObject.toJSONString(ntcAppLog);
+// } catch (Exception e) {
+// logger.error(("APP 日志补全出现错误!!! ") + e);
+// return "";
+// }
+// }
+//
+//
+// /**
+// * DDOS Log Replenish
+// *
+// * @param message
+// * @return
+// */
+// private static String ddosReplenish(String message) {
+// try {
+// NTC_DDOS_LOG ntcDdosLog = JSONObject.parseObject(message, NTC_DDOS_LOG.class);
+// String sIp = ntcDdosLog.getS_ip();
+// String dIp = ntcDdosLog.getD_ip();
+// ntcDdosLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcDdosLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcDdosLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcDdosLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcDdosLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP));
+//// ntcDdosLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp));
+// return JSONObject.toJSONString(ntcDdosLog);
+// } catch (Exception e) {
+// logger.error(("DDOS 日志补全出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * SSL Log Replenish
+// *
+// * @param message
+// * @return
+// */
+// private static String sslReplenish(String message) {
+// try {
+// NTC_SSL_LOG ntcSslLog = JSONObject.parseObject(message, NTC_SSL_LOG.class);
+// String sIp = ntcSslLog.getS_ip();
+// String dIp = ntcSslLog.getD_ip();
+// ntcSslLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcSslLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcSslLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcSslLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcSslLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp));
+//// ntcSslLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP));
+// return JSONObject.toJSONString(ntcSslLog);
+// } catch (Exception e) {
+// logger.error("SSL 日志补全出现错误!!!" + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * DNS Log Replenish
+// *
+// * @param message
+// * @return
+// */
+// private static String dnsReplenish(String message) {
+// try {
+// NTC_DNS_LOG ntcDnsLog = JSONObject.parseObject(message, NTC_DNS_LOG.class);
+// String sIp = ntcDnsLog.getS_ip();
+// String dIp = ntcDnsLog.getD_ip();
+// ntcDnsLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcDnsLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcDnsLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcDnsLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcDnsLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP));
+//// ntcDnsLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp));
+// return JSONObject.toJSONString(ntcDnsLog);
+// } catch (Exception e) {
+// logger.error(("DNS 日志补全出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * P2P Log Replenish
+// *
+// * @param message
+// * @return
+// */
+// private static String p2pReplenish(String message) {
+// try {
+// NTC_P2P_LOG ntcP2PLog = JSONObject.parseObject(message, NTC_P2P_LOG.class);
+// String sIp = ntcP2PLog.getS_ip();
+// String dIp = ntcP2PLog.getD_ip();
+// ntcP2PLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcP2PLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcP2PLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcP2PLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcP2PLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp));
+//// ntcP2PLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP));
+// return JSONObject.toJSONString(ntcP2PLog);
+// } catch (Exception e) {
+// logger.error(("P2P 日志补全出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * pxy 文件方法
+// *
+// * @param message
+// * @return
+// */
+//
+// public static String pxyLogFile(String message) {
+// try {
+// PXY_HTTP_LOG pxyHttpLog = JSONObject.parseObject(message, PXY_HTTP_LOG.class);
+// int cfgId = pxyHttpLog.getCfg_id();
+// String url = pxyHttpLog.getUrl();
+// pxyHttpLog.setReq_body_key(MD5Utils.md5Encode(cfgId + url + pxyHttpLog.getReq_body() + pxyHttpLog.getS_ip() + pxyHttpLog.getFound_time()));
+// pxyHttpLog.setResp_body_key(TransFormUtils.getHttpKey(cfgId, url, pxyHttpLog.getResp_body()));
+// pxyHttpLog.setWebsite(StringUtil.getDomain(pxyHttpLog.getUrl()));
+// return JSONObject.toJSONString(pxyHttpLog);
+// } catch (Exception e) {
+// logger.error(("PXY 文件补全出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+//
+// /**
+// * PXYHTTP Log Replenish
+// *
+// * @param message
+// * @return
+// */
+// private static String pxyHttpReplenish(String message) {
+// try {
+// PXY_HTTP_LOG pxyHttpLog = JSONObject.parseObject(message, PXY_HTTP_LOG.class);
+// String sIp = pxyHttpLog.getS_ip();
+// String dIp = pxyHttpLog.getD_ip();
+// pxyHttpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// pxyHttpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// pxyHttpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// pxyHttpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+// if (StringUtil.isNotBlank(pxyHttpLog.getReq_body())) {
+// pxyHttpLog.setReq_body("http://" + FlowWriteConfig.ASTANA_OSS_ADDRS + "/download/" + pxyHttpLog.getReq_body_key() + "." + "html");
+// }
+// if (StringUtil.isNotBlank(pxyHttpLog.getResp_body())) {
+// pxyHttpLog.setResp_body("http://" + FlowWriteConfig.ASTANA_OSS_ADDRS + "/download/" + pxyHttpLog.getResp_body_key() + "." + "html");
+// }
+// return JSONObject.toJSONString(pxyHttpLog);
+// } catch (Exception e) {
+// logger.error("PXY 日志补全出现错误!!!" + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * BGP Log Replenish
+// *
+// * @param message
+// * @return
+// */
+// private static String bgpReplenish(String message) {
+// try {
+// NTC_BGP_LOG ntcBgpLog = JSONObject.parseObject(message, NTC_BGP_LOG.class);
+// String sIp = ntcBgpLog.getS_ip();
+// String dIp = ntcBgpLog.getD_ip();
+// ntcBgpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcBgpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcBgpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcBgpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcBgpLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP));
+//// ntcBgpLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp));
+// return JSONObject.toJSONString(ntcBgpLog);
+// } catch (Exception e) {
+// logger.error(("BGP 日志补全出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * FTP Log Replenish
+// *
+// * @param message
+// * @return
+// */
+// private static String ftpReplenish(String message) {
+// try {
+// NTC_FTP_LOG ntcFtpLog = JSONObject.parseObject(message, NTC_FTP_LOG.class);
+// String sIp = ntcFtpLog.getS_ip();
+// String dIp = ntcFtpLog.getD_ip();
+// ntcFtpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcFtpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcFtpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcFtpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcFtpLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP));
+//// ntcFtpLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp));
+// return JSONObject.toJSONString(ntcFtpLog);
+// } catch (Exception e) {
+// logger.error(("FTP 日志补全出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * Stream Media Log Replenish
+// *
+// * @param message
+// * @return
+// */
+// private static String streamMediaReplenish(String message) {
+// try {
+// NTC_STREAMING_MEDIA_LOG ntcStreamingMediaLog = JSONObject.parseObject(message, NTC_STREAMING_MEDIA_LOG.class);
+// String sIp = ntcStreamingMediaLog.getS_ip();
+// String dIp = ntcStreamingMediaLog.getD_ip();
+// ntcStreamingMediaLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcStreamingMediaLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcStreamingMediaLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcStreamingMediaLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcStreamingMediaLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp));
+//// ntcStreamingMediaLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP));
+// return JSONObject.toJSONString(ntcStreamingMediaLog);
+// } catch (Exception e) {
+// logger.error(("Stream 日志补全出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// /**
+// * Voip Log Replenish
+// *
+// * @param message
+// * @return
+// */
+// private static String voipReplenish(String message) {
+// try {
+// NTC_VOIP_LOG ntcVoipLog = JSONObject.parseObject(message, NTC_VOIP_LOG.class);
+// String sIp = ntcVoipLog.getS_ip();
+// String dIp = ntcVoipLog.getD_ip();
+// ntcVoipLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcVoipLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcVoipLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcVoipLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcVoipLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp));
+//// ntcVoipLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP));
+// return JSONObject.toJSONString(ntcVoipLog);
+// } catch (Exception e) {
+// logger.error(("VOIP 日志补全出现错误!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// private static String openVpnLog(String message) {
+// try {
+// NTC_OPENVPN_LOG ntcOpenvpnLog = JSONObject.parseObject(message, NTC_OPENVPN_LOG.class);
+// String sIp = ntcOpenvpnLog.getS_ip();
+// String dIp = ntcOpenvpnLog.getD_ip();
+// ntcOpenvpnLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcOpenvpnLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcOpenvpnLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcOpenvpnLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcOpenvpnLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcOpenvpnLog.getS_ip()));
+//// ntcOpenvpnLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcOpenvpnLog.getD_ip()));
+// return JSONObject.toJSONString(ntcOpenvpnLog);
+// } catch (Exception e) {
+// logger.error(("Log parsing error!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+//
+// /**
+// * 音视频topic对准类
+// *
+// * @param message 日志
+// * @param topic topic名
+// * @return 补全后日志
+// */
+// public static String getMMData(String message, String topic) {
+// switch (topic) {
+// case "NTC-COLLECT-VOIP-LOG":
+// return collectVoipLog(message);
+// case "MM-PORN-AUDIO-LEVEL-LOG":
+// return avIpLog(message);
+// case "MM-PORN-VIDEO-LEVEL-LOG":
+// return avIpLog(message);
+// case "MM-SAMPLE-AUDIO-LOG":
+// return avIpLog(message);
+// case "MM-SAMPLE-VIDEO-LOG":
+// return avIpLog(message);
+// case "MM-SAMPLE-PIC-LOG":
+// return avIpLog(message);
+// case "MM-SAMPLE-VOIP-LOG":
+// return voipIpLog(message);
+// case "MM-FILE-DIGEST-LOG":
+// return avIpLog(message);
+// case "MM-AV-IP-LOG":
+// return avIpLog(message);
+// case "MM-SPEAKER-RECOGNIZATION-LOG":
+// return avIpLog(message);
+// case "MM-LOGO-DETECTION-LOG":
+// return avIpLog(message);
+// case "MM-FACE-RECOGNIZATION-LOG":
+// return avIpLog(message);
+// case "MM-AV-URL-LOG":
+// return avIpLog(message);
+// case "MM-PIC-IP-LOG":
+// return avIpLog(message);
+// case "MM-PIC-URL-LOG":
+// return avIpLog(message);
+// case "MM-VOIP-IP-LOG":
+// return voipIpLog(message);
+// case "MM-VOIP-ACCOUNT-LOG":
+// return voipIpLog(message);
+// default:
+// logger.error("There is no corresponding topic! topic name is :" + topic);
+// break;
+// }
+// return null;
+// }
+//
+//
+// private static String avIpLog(String message) {
+// try {
+// MM_AV_IP_LOG mmAvIpLog = JSONObject.parseObject(message, MM_AV_IP_LOG.class);
+// String sIp = mmAvIpLog.getS_ip();
+// String dIp = mmAvIpLog.getD_ip();
+// mmAvIpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// mmAvIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// mmAvIpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// mmAvIpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// mmAvIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp));
+//// mmAvIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP));
+// String url = mmAvIpLog.getLog_uri();
+// if (StringUtil.isNotBlank(url)) {
+// String key = MD5Utils.md5Encode(url);
+// String end = StringUtil.getFileExtendName(url).toLowerCase();
+// mmAvIpLog.setLog_uri("http://" + FlowWriteConfig.ALMATY_OSS_ADDRS + "/download/" + key + "." + end);
+// }
+// return JSONObject.toJSONString(mmAvIpLog);
+// } catch (Exception e) {
+// logger.error(("Log parsing error!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// private static String voipIpLog(String message) {
+// try {
+// MM_VOIP_IP_LOG mmVoipIpLog = JSONObject.parseObject(message, MM_VOIP_IP_LOG.class);
+// String sIp = mmVoipIpLog.getS_ip();
+// String dIp = mmVoipIpLog.getD_ip();
+// mmVoipIpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// mmVoipIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// mmVoipIpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// mmVoipIpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// mmVoipIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp));
+//// mmVoipIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP));
+// String url = mmVoipIpLog.getLog_uri();
+// if (StringUtil.isNotBlank(url)) {
+// String key = MD5Utils.md5Encode(url);
+// String end = StringUtil.getFileExtendName(url).toLowerCase();
+// mmVoipIpLog.setLog_uri("http://" + FlowWriteConfig.ALMATY_OSS_ADDRS + "/download/" + key + "." + end);
+// }
+// return JSONObject.toJSONString(mmVoipIpLog);
+// } catch (Exception e) {
+// logger.error(("Log parsing error!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+//
+// private static String collectVoipLog(String message) {
+// try {
+// NTC_COLLECT_VOIP_LOG ntcCollectVoipLog = JSONObject.parseObject(message, NTC_COLLECT_VOIP_LOG.class);
+// if (StringUtil.isNotBlank(ntcCollectVoipLog.getTo_from_store_url())) {
+// String key = MD5Utils.md5Encode(ntcCollectVoipLog.getPid() + ntcCollectVoipLog.getTo_from_store_url() + ntcCollectVoipLog.getFound_time());
+// String url = ntcCollectVoipLog.getTo_from_store_url();
+// String end = StringUtil.getFileExtendName(url).toLowerCase();
+// ntcCollectVoipLog.setTo_from_store_url("http://" + FlowWriteConfig.ALMATY_OSS_ADDRS + "/download/" + key + "." + end);
+// }
+// if (StringUtil.isNotBlank(ntcCollectVoipLog.getFrom_to_store_url())) {
+// String key = MD5Utils.md5Encode(ntcCollectVoipLog.getPid() + ntcCollectVoipLog.getFrom_to_store_url() + ntcCollectVoipLog.getFound_time());
+// String url = ntcCollectVoipLog.getFrom_to_store_url();
+// String end = StringUtil.getFileExtendName(url).toLowerCase();
+// ntcCollectVoipLog.setFrom_to_store_url("http://" + FlowWriteConfig.ALMATY_OSS_ADDRS + "/download/" + key + "." + end);
+// }
+// return JSONObject.toJSONString(ntcCollectVoipLog);
+// } catch (Exception e) {
+// logger.error(("Log parsing error!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+//
+// /**
+// * 其他topic对准类
+// *
+// * @param message 日志
+// * @return 补全后日志
+// */
+// public static String getOtherData(String message, String topic) {
+// switch (topic) {
+// case "NTC-HTTP-LOG":
+// return otherHttpLog(message);
+// case "NTC-CONN-RECORD-LOG":
+// NTC_CONN_RECORD_LOG ntcConnRecordLog = JSONObject.parseObject(message, NTC_CONN_RECORD_LOG.class, Feature.OrderedField);
+// ntcConnRecordLog.setD_asn(ipLookup.asnLookup(ntcConnRecordLog.getD_ip(), true));
+// ntcConnRecordLog.setS_asn(ipLookup.asnLookup(ntcConnRecordLog.getS_ip(), true));
+// return JSONObject.toJSONString(ntcConnRecordLog);
+// default:
+// logger.error("There is no corresponding topic! topic name is :" + topic);
+// break;
+// }
+// return null;
+// }
+//
+//
+// private static String otherHttpLog(String message) {
+// NTC_HTTP_LOG ntcHttpLog = JSONObject.parseObject(message, NTC_HTTP_LOG.class);
+// if (ntcHttpLog.getService() == 152) {
+// if (StringUtil.isBlank(ntcHttpLog.getUrl())) {
+// TransFormUtils.getUniFlow(ntcHttpLog);
+// }
+// String sIp = ntcHttpLog.getS_ip();
+// String dIp = ntcHttpLog.getD_ip();
+// ntcHttpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcHttpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcHttpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcHttpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+// ntcHttpLog.setWebsite(StringUtil.getDomain(ntcHttpLog.getUrl()));
+//// ntcHttpLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP));
+//// ntcHttpLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp));
+// return JSONObject.toJSONString(ntcHttpLog);
+// }
+// return "";
+// }
+//
+//
+// /**
+// * collect Mail to Mail
+// *
+// * @param message
+// * @return
+// */
+// public static String collectMailToMailLog(String message) {
+// try {
+// NTC_MAIL_LOG ntcMailLog = JSONObject.parseObject(message, NTC_MAIL_LOG.class);
+// String sIp = ntcMailLog.getS_ip();
+// String dIp = ntcMailLog.getD_ip();
+// ntcMailLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcMailLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcMailLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcMailLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcMailLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp));
+//// ntcMailLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP));
+// ntcMailLog.setEml_key(TransFormUtils.getEmlKey(ntcMailLog.getFound_time(), ntcMailLog.getMail_from(),
+// ntcMailLog.getMail_to(), ntcMailLog.getSubject(), ntcMailLog.getEml_file()));
+// if (StringUtil.isNotBlank(ntcMailLog.getSubject())) {
+// String subjectCharset = JSONObject.parseObject(message).getString("subject_charset");
+// ntcMailLog.setSubject(DecodeUtils.base64Str(ntcMailLog.getSubject(), subjectCharset));
+// }
+// TransFormUtils.setMailFile(ntcMailLog);
+// return JSONObject.toJSONString(ntcMailLog);
+// } catch (Exception e) {
+// logger.error(("Log parsing error!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+//
+// /**
+// * ---------------------------------删除的topic------------------------------------------------------------
+// **/
+//
+// private static String behaviorLog(String message) {
+// try {
+// DK_BEHAVIOR_LOG dkBehaviorLog = JSONObject.parseObject(message, DK_BEHAVIOR_LOG.class, Feature.OrderedField);
+// dkBehaviorLog.setServer_locate(ipLookup.countryLookup(dkBehaviorLog.getD_ip()));
+// dkBehaviorLog.setClient_locate(ipLookup.cityLookupDetail(dkBehaviorLog.getS_ip()));
+// dkBehaviorLog.setD_asn(ipLookup.asnLookup(dkBehaviorLog.getD_ip()).trim());
+// dkBehaviorLog.setS_asn(ipLookup.asnLookup(dkBehaviorLog.getS_ip()).trim());
+//// dkBehaviorLog.setS_subscribe_id(TransFormUtils.getSubscribe(dkBehaviorLog.getS_ip()));
+//// dkBehaviorLog.setD_subscribe_id(TransFormUtils.getSubscribe(dkBehaviorLog.getD_ip()));
+// return JSONObject.toJSONString(dkBehaviorLog);
+// } catch (Exception e) {
+// logger.error(("Log parsing error!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// private static String ipsecLog(String message) {
+// try {
+// NTC_IPSEC_LOG ntcIpsecLog = JSONObject.parseObject(message, NTC_IPSEC_LOG.class, Feature.OrderedField);
+// ntcIpsecLog.setServer_locate(ipLookup.countryLookup(ntcIpsecLog.getD_ip()));
+// ntcIpsecLog.setClient_locate(ipLookup.cityLookupDetail(ntcIpsecLog.getS_ip()));
+// ntcIpsecLog.setD_asn(ipLookup.asnLookup(ntcIpsecLog.getD_ip()).trim());
+// ntcIpsecLog.setS_asn(ipLookup.asnLookup(ntcIpsecLog.getS_ip()).trim());
+//// ntcIpsecLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcIpsecLog.getS_ip()));
+//// ntcIpsecLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcIpsecLog.getD_ip()));
+// return JSONObject.toJSONString(ntcIpsecLog);
+// } catch (Exception e) {
+// logger.error(("Log parsing error!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// private static String l2tpLog(String message) {
+// try {
+// NTC_L2TP_LOG ntcL2TPLog = JSONObject.parseObject(message, NTC_L2TP_LOG.class, Feature.OrderedField);
+// ntcL2TPLog.setServer_locate(ipLookup.countryLookup(ntcL2TPLog.getD_ip()));
+// ntcL2TPLog.setClient_locate(ipLookup.cityLookupDetail(ntcL2TPLog.getS_ip()));
+// ntcL2TPLog.setD_asn(ipLookup.asnLookup(ntcL2TPLog.getD_ip()).trim());
+// ntcL2TPLog.setS_asn(ipLookup.asnLookup(ntcL2TPLog.getS_ip()).trim());
+//// ntcL2TPLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcL2TPLog.getS_ip()));
+//// ntcL2TPLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcL2TPLog.getD_ip()));
+// return JSONObject.toJSONString(ntcL2TPLog);
+// } catch (Exception e) {
+// logger.error(("Log parsing error!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+//
+// private static String sshLog(String message) {
+// try {
+// NTC_SSH_LOG ntcSshLog = JSONObject.parseObject(message, NTC_SSH_LOG.class, Feature.OrderedField);
+// ntcSshLog.setServer_locate(ipLookup.countryLookup(ntcSshLog.getD_ip()));
+// ntcSshLog.setClient_locate(ipLookup.cityLookupDetail(ntcSshLog.getS_ip()));
+// ntcSshLog.setD_asn(ipLookup.asnLookup(ntcSshLog.getD_ip()).trim());
+// ntcSshLog.setS_asn(ipLookup.asnLookup(ntcSshLog.getS_ip()).trim());
+// try {
+// ntcSshLog.setVersion(ntcSshLog.getVersion().replaceAll("\n", "\\\\n"));
+// ntcSshLog.setVersion(ntcSshLog.getVersion().replaceAll("\r", "\\\\r"));
+// } catch (Exception e) {
+// ntcSshLog.setVersion("");
+// }
+//// ntcSshLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcSshLog.getS_ip()));
+//// ntcSshLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcSshLog.getD_ip()));
+// return JSONObject.toJSONString(ntcSshLog);
+// } catch (Exception e) {
+// logger.error("Log parsing error!!! " + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+//
+// /**
+// * PPTP Log Replenish
+// *
+// * @param message
+// * @return
+// */
+// private static String pptpReplenish(String message) {
+// try {
+// NTC_PPTP_LOG ntcPptpLog = JSONObject.parseObject(message, NTC_PPTP_LOG.class);
+// ntcPptpLog.setServer_locate(ipLookup.countryLookup(ntcPptpLog.getD_ip()));
+// ntcPptpLog.setClient_locate(ipLookup.cityLookupDetail(ntcPptpLog.getS_ip()));
+// ntcPptpLog.setD_asn(ipLookup.asnLookup(ntcPptpLog.getD_ip()).trim());
+// ntcPptpLog.setS_asn(ipLookup.asnLookup(ntcPptpLog.getS_ip()).trim());
+//// ntcPptpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcPptpLog.getS_ip()));
+//// ntcPptpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcPptpLog.getD_ip()));
+// return JSONObject.toJSONString(ntcPptpLog);
+// } catch (Exception e) {
+// logger.error(("Log parsing error!!! ") + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+//
+// /**---------------------------------删除的topic------------------------------------------------------------**/
+//
+//}
diff --git a/src/main/java/cn/ac/iie/utils/getjson/GetStrToClickHouseUtils.java b/src/main/java/cn/ac/iie/utils/getjson/GetStrToClickHouseUtils.java
new file mode 100644
index 0000000..3ac5d1e
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/getjson/GetStrToClickHouseUtils.java
@@ -0,0 +1,585 @@
+//package cn.ac.iie.utils.getjson;
+//
+//import cn.ac.iie.bean.dk.DK_BEHAVIOR_LOG;
+//import cn.ac.iie.bean.mm.MM_AV_IP_LOG;
+//import cn.ac.iie.bean.mm.MM_VOIP_IP_LOG;
+//import cn.ac.iie.bean.ntc.NTC_CONN_RECORD_LOG;
+//import cn.ac.iie.bean.ntcwidely.NTC_COLLECT_SSL_LOG;
+//import cn.ac.iie.bean.pxy.PXY_HTTP_LOG;
+//import cn.ac.iie.common.FlowWriteConfig;
+//import cn.ac.iie.common.RealtimeCountConfig;
+//import cn.ac.iie.utils.ordinary.DecodeUtils;
+//import com.alibaba.fastjson.JSONObject;
+//import com.alibaba.fastjson.parser.Feature;
+//import com.zdjizhi.utils.IpLookup;
+//import com.zdjizhi.utils.StringUtil;
+//import org.apache.log4j.Logger;
+//
+///**
+// * @author antlee
+// * @date 2018/7/19
+// */
+//public class GetStrToClickHouseUtils {
+// private static Logger logger = Logger.getLogger(GetStrToClickHouseUtils.class);
+// private static IpLookup ipLookup = new IpLookup.Builder(false)
+// .loadDataFileV4(RealtimeCountConfig.IP_LIBRARY)
+// .loadDataFileV6(RealtimeCountConfig.IP_LIBRARY)
+// .build();
+//
+// /**
+// * NTC topic对准类
+// *
+// * @param message 日志
+// * @param topic topic名称
+// * @return 补全日志
+// */
+// public static String getData(String message, String topic) {
+// switch (topic) {
+// case "NTC-CONN-RECORD-LOG":
+// return ntcConnLog(message);
+// case "NTC-COLLECT-MAIL-LOG":
+// return mailLog(message);
+// case "NTC-COLLECT-SSL-LOG":
+// return sslLog(message);
+// case "NTC-SSL-LOG":
+// return sslLog(message);
+// case "NTC-APP-LOG":
+// return appLog(message);
+// case "NTC-HTTP-LOG":
+// return httpLog(message);
+// case "NTC-IP-LOG":
+// return ipLog(message);
+// case "PXY-HTTP-LOG":
+// return pHttpLog(message);
+// case "NTC-DNS-LOG":
+// return dnsLog(message);
+// case "NTC-BGP-LOG":
+// return bgpLog(message);
+// case "NTC-DDOS-LOG":
+// return ddosLog(message);
+// case "NTC-FTP-LOG":
+// return ftpLog(message);
+// case "NTC-MAIL-LOG":
+// return mailLog(message);
+// case "NTC-OPENVPN-LOG":
+// return openVpnLog(message);
+// case "NTC-P2P-LOG":
+// return p2pLog(message);
+// case "NTC-STREAMING-MEDIA-LOG":
+// return streamMediaLog(message);
+// case "NTC-VOIP-LOG":
+// return voipLog(message);
+// case "NTC-KEYWORDS-URL-LOG":
+// return message;
+// case "MM-SAMPLE-AUDIO-LOG":
+// return avIpLog(message);
+// case "MM-SAMPLE-VIDEO-LOG":
+// return avIpLog(message);
+// case "MM-PORN-AUDIO-LEVEL-LOG":
+// return avIpLog(message);
+// case "MM-PORN-VIDEO-LEVEL-LOG":
+// return avIpLog(message);
+// case "MM-SAMPLE-PIC-LOG":
+// return avIpLog(message);
+// case "MM-SAMPLE-VOIP-LOG":
+// return voipIpLog(message);
+// case "MM-FILE-DIGEST-LOG":
+// return avIpLog(message);
+// case "NTC-COLLECT-VOIP-LOG":
+// return collectVoipLog(message);
+//
+//// case "MM-AV-IP-LOG":
+//// return avIpLog(message);
+//// case "MM-AV-URL-LOG":
+//// return avIpLog(message);
+//// case "MM-PIC-IP-LOG":
+//// return avIpLog(message);
+//// case "MM-PIC-URL-LOG":
+//// return avIpLog(message);
+//// case "MM-VOIP-IP-LOG":
+//// return voipIpLog(message);
+//// case "MM-SPEAKER-RECOGNIZATION-LOG":
+//// return avIpLog(message);
+//// case "MM-LOGO-DETECTION-LOG":
+//// return avIpLog(message);
+//// case "MM-FACE-RECOGNIZATION-LOG":
+//// return avIpLog(message);
+//// case "MM-VOIP-ACCOUNT-LOG":
+//// return voipIpLog(message);
+//// case "NTC-IPSEC-LOG":
+//// return ipsecLog(message);
+//// case "NTC-L2TP-LOG":
+//// return l2tpLog(message);
+//// case "NTC-PPTP-LOG":
+//// return pptpLog(message);
+//// case "NTC-SSH-LOG":
+//// return sshLog(message);
+//
+// default:
+// logger.error("There is no corresponding topic! topic name is :" + topic);
+// break;
+// }
+// return null;
+// }
+//
+//
+// private static String ntcConnLog(String message) {
+// try {
+// NTC_CONN_RECORD_LOG ntcConnRecordLog = JSONObject.parseObject(message, NTC_CONN_RECORD_LOG.class);
+// String appLabel = ntcConnRecordLog.getApp_label();
+// String sIp = ntcConnRecordLog.getS_ip();
+// String dIp = ntcConnRecordLog.getD_ip();
+// ntcConnRecordLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcConnRecordLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcConnRecordLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcConnRecordLog.setS_asn(ipLookup.asnLookup(sIp, true));
+// if (StringUtil.isNotBlank(appLabel)) {
+// String[] split = ntcConnRecordLog.getApp_label().split(";");
+// ntcConnRecordLog.setProto_id(Integer.parseInt(split[0].split("=")[1]));
+// ntcConnRecordLog.setApp_id(Integer.parseInt(split[1].split("=")[1]));
+// ntcConnRecordLog.setOs_id(Integer.parseInt(split[2].split("=")[1]));
+// ntcConnRecordLog.setBs_id(Integer.parseInt(split[3].split("=")[1]));
+// ntcConnRecordLog.setWeb_id(Integer.parseInt(split[4].split("=")[1]));
+// ntcConnRecordLog.setBehav_id(Integer.parseInt(split[5].split("=")[1]));
+// } else {
+// ntcConnRecordLog.setProto_id(0);
+// ntcConnRecordLog.setApp_id(0);
+// ntcConnRecordLog.setOs_id(0);
+// ntcConnRecordLog.setBs_id(0);
+// ntcConnRecordLog.setWeb_id(0);
+// ntcConnRecordLog.setBehav_id(0);
+// }
+// return JSONObject.toJSONString(ntcConnRecordLog);
+// } catch (Exception e) {
+// logger.error("Log parsing error!!! \n" + message);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+//
+// private static String avIpLog(String message) {
+// try {
+// MM_AV_IP_LOG mmAvIpLog = JSONObject.parseObject(message, MM_AV_IP_LOG.class);
+// String sIp = mmAvIpLog.getS_ip();
+// String dIp = mmAvIpLog.getD_ip();
+// mmAvIpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// mmAvIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// mmAvIpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// mmAvIpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// mmAvIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(mmAvIpLog.getS_ip()));
+//// mmAvIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(mmAvIpLog.getD_ip()));
+// return JSONObject.toJSONString(mmAvIpLog);
+// } catch (Exception e) {
+// logger.error("Log parsing error!!! \n" + message);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// private static String voipIpLog(String message) {
+// try {
+// MM_VOIP_IP_LOG mmVoipIpLog = JSONObject.parseObject(message, MM_VOIP_IP_LOG.class);
+// String sIp = mmVoipIpLog.getS_ip();
+// String dIp = mmVoipIpLog.getD_ip();
+// mmVoipIpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// mmVoipIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// mmVoipIpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// mmVoipIpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// mmVoipIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(mmVoipIpLog.getS_ip()));
+//// mmVoipIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(mmVoipIpLog.getD_ip()));
+// return JSONObject.toJSONString(mmVoipIpLog);
+// } catch (Exception e) {
+// logger.error("Log parsing error!!! \n" + message);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// private static String collectVoipLog(String message) {
+// try {
+// NTC_COLLECT_VOIP_LOG ntcCollectVoipLog = JSONObject.parseObject(message, NTC_COLLECT_VOIP_LOG.class);
+// return JSONObject.toJSONString(ntcCollectVoipLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+//
+// private static String appLog(String message) {
+// try {
+// NTC_APP_LOG ntcAppLog = JSONObject.parseObject(message, NTC_APP_LOG.class);
+// String sIp = ntcAppLog.getS_ip();
+// String dIp = ntcAppLog.getD_ip();
+// ntcAppLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcAppLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcAppLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcAppLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcAppLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcAppLog.getS_ip()));
+//// ntcAppLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcAppLog.getD_ip()));
+// return JSONObject.toJSONString(ntcAppLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String bgpLog(String message) {
+// try {
+// NTC_BGP_LOG ntcBgpLog = JSONObject.parseObject(message, NTC_BGP_LOG.class);
+// String sIp = ntcBgpLog.getS_ip();
+// String dIp = ntcBgpLog.getD_ip();
+// ntcBgpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcBgpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcBgpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcBgpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcBgpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcBgpLog.getS_ip()));
+//// ntcBgpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcBgpLog.getD_ip()));
+// return JSONObject.toJSONString(ntcBgpLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String ddosLog(String message) {
+// try {
+// NTC_DDOS_LOG ntcDdosLog = JSONObject.parseObject(message, NTC_DDOS_LOG.class);
+// String sIp = ntcDdosLog.getS_ip();
+// String dIp = ntcDdosLog.getD_ip();
+// ntcDdosLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcDdosLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcDdosLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcDdosLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcDdosLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcDdosLog.getS_ip()));
+//// ntcDdosLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcDdosLog.getD_ip()));
+// return JSONObject.toJSONString(ntcDdosLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String dnsLog(String message) {
+// try {
+// NTC_DNS_LOG ntcDnsLog = JSONObject.parseObject(message, NTC_DNS_LOG.class);
+// String sIp = ntcDnsLog.getS_ip();
+// String dIp = ntcDnsLog.getD_ip();
+// ntcDnsLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcDnsLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcDnsLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcDnsLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcDnsLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcDnsLog.getS_ip()));
+//// ntcDnsLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcDnsLog.getD_ip()));
+// return JSONObject.toJSONString(ntcDnsLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String ftpLog(String message) {
+// try {
+// NTC_FTP_LOG ntcFtpLog = JSONObject.parseObject(message, NTC_FTP_LOG.class);
+// String sIp = ntcFtpLog.getS_ip();
+// String dIp = ntcFtpLog.getD_ip();
+// ntcFtpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcFtpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcFtpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcFtpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcFtpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcFtpLog.getS_ip()));
+//// ntcFtpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcFtpLog.getD_ip()));
+// return JSONObject.toJSONString(ntcFtpLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String httpLog(String message) {
+// try {
+// NTC_HTTP_LOG ntcHttpLog = JSONObject.parseObject(message, NTC_HTTP_LOG.class);
+//// if (StringUtil.isBlank(ntcHttpLog.getUrl())) {
+//// TransFormUtils.getUniFlow(ntcHttpLog);
+//// }
+// String sIp = ntcHttpLog.getS_ip();
+// String dIp = ntcHttpLog.getD_ip();
+// ntcHttpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcHttpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcHttpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcHttpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcHttpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcHttpLog.getS_ip()));
+//// ntcHttpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcHttpLog.getD_ip()));
+// ntcHttpLog.setWebsite(StringUtil.getDomain(ntcHttpLog.getUrl()));
+// return JSONObject.toJSONString(ntcHttpLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String ipLog(String message) {
+// try {
+// NTC_IP_LOG ntcIpLog = JSONObject.parseObject(message, NTC_IP_LOG.class);
+// String sIp = ntcIpLog.getS_ip();
+// String dIp = ntcIpLog.getD_ip();
+// ntcIpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcIpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcIpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcIpLog.getS_ip()));
+//// ntcIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcIpLog.getD_ip()));
+// return JSONObject.toJSONString(ntcIpLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+//
+// private static String mailLog(String message) {
+// try {
+// NTC_MAIL_LOG ntcMailLog = JSONObject.parseObject(message, NTC_MAIL_LOG.class);
+// String sIp = ntcMailLog.getS_ip();
+// String dIp = ntcMailLog.getD_ip();
+// ntcMailLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcMailLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcMailLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcMailLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcMailLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcMailLog.getS_ip()));
+//// ntcMailLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcMailLog.getD_ip()));
+// if (StringUtil.isNotBlank(ntcMailLog.getSubject())) {
+// String subjectCharset = JSONObject.parseObject(message).getString("subject_charset");
+// ntcMailLog.setSubject(DecodeUtils.base64Str(ntcMailLog.getSubject(), subjectCharset));
+// }
+// return JSONObject.toJSONString(ntcMailLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String openVpnLog(String message) {
+// try {
+// NTC_OPENVPN_LOG ntcOpenvpnLog = JSONObject.parseObject(message, NTC_OPENVPN_LOG.class);
+// String sIp = ntcOpenvpnLog.getS_ip();
+// String dIp = ntcOpenvpnLog.getD_ip();
+// ntcOpenvpnLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcOpenvpnLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcOpenvpnLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcOpenvpnLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcOpenvpnLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcOpenvpnLog.getS_ip()));
+//// ntcOpenvpnLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcOpenvpnLog.getD_ip()));
+// return JSONObject.toJSONString(ntcOpenvpnLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String p2pLog(String message) {
+// try {
+// NTC_P2P_LOG ntcP2PLog = JSONObject.parseObject(message, NTC_P2P_LOG.class);
+// String sIp = ntcP2PLog.getS_ip();
+// String dIp = ntcP2PLog.getD_ip();
+// ntcP2PLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcP2PLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcP2PLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcP2PLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcP2PLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcP2PLog.getS_ip()));
+//// ntcP2PLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcP2PLog.getD_ip()));
+// return JSONObject.toJSONString(ntcP2PLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+//
+// private static String streamMediaLog(String message) {
+// try {
+// NTC_STREAMING_MEDIA_LOG ntcStreamingMediaLog = JSONObject.parseObject(message, NTC_STREAMING_MEDIA_LOG.class);
+// String sIp = ntcStreamingMediaLog.getS_ip();
+// String dIp = ntcStreamingMediaLog.getD_ip();
+// ntcStreamingMediaLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcStreamingMediaLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcStreamingMediaLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcStreamingMediaLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcStreamingMediaLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcStreamingMediaLog.getS_ip()));
+//// ntcStreamingMediaLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcStreamingMediaLog.getD_ip()));
+// return JSONObject.toJSONString(ntcStreamingMediaLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String voipLog(String message) {
+// try {
+// NTC_VOIP_LOG ntcVoipLog = JSONObject.parseObject(message, NTC_VOIP_LOG.class);
+// String sIp = ntcVoipLog.getS_ip();
+// String dIp = ntcVoipLog.getD_ip();
+// ntcVoipLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcVoipLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcVoipLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcVoipLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcVoipLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcVoipLog.getS_ip()));
+//// ntcVoipLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcVoipLog.getD_ip()));
+// return JSONObject.toJSONString(ntcVoipLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+//
+// private static String sslLog(String message) {
+// try {
+// NTC_COLLECT_SSL_LOG ntcSslLog = JSONObject.parseObject(message, NTC_COLLECT_SSL_LOG.class);
+// String sIp = ntcSslLog.getS_ip();
+// String dIp = ntcSslLog.getD_ip();
+// ntcSslLog.setServer_locate(ipLookup.countryLookup(dIp));
+// ntcSslLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// ntcSslLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// ntcSslLog.setS_asn(ipLookup.asnLookup(sIp, true));
+//// ntcSslLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcSslLog.getS_ip()));
+//// ntcSslLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcSslLog.getD_ip()));
+// return JSONObject.toJSONString(ntcSslLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String pHttpLog(String message) {
+// try {
+// PXY_HTTP_LOG pxyHttpLog = JSONObject.parseObject(message, PXY_HTTP_LOG.class);
+// String sIp = pxyHttpLog.getS_ip();
+// String dIp = pxyHttpLog.getD_ip();
+// pxyHttpLog.setServer_locate(ipLookup.countryLookup(dIp));
+// pxyHttpLog.setClient_locate(ipLookup.cityLookupDetail(sIp));
+// pxyHttpLog.setD_asn(ipLookup.asnLookup(dIp, true));
+// pxyHttpLog.setS_asn(ipLookup.asnLookup(sIp, true));
+// pxyHttpLog.setWebsite(StringUtil.getDomain(pxyHttpLog.getUrl()));
+// return JSONObject.toJSONString(pxyHttpLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+//
+// /**
+// * ---------------------------------删除的topic------------------------------------------------------------
+// **/
+//
+//
+// private static String behaviorLog(String message) {
+// try {
+// DK_BEHAVIOR_LOG dkBehaviorLog = JSONObject.parseObject(message, DK_BEHAVIOR_LOG.class, Feature.OrderedField);
+// dkBehaviorLog.setServer_locate(ipLookup.countryLookup(dkBehaviorLog.getD_ip()));
+// dkBehaviorLog.setClient_locate(ipLookup.cityLookupDetail(dkBehaviorLog.getS_ip()));
+// dkBehaviorLog.setD_asn(ipLookup.asnLookup(dkBehaviorLog.getD_ip()).trim());
+// dkBehaviorLog.setS_asn(ipLookup.asnLookup(dkBehaviorLog.getS_ip()).trim());
+//// dkBehaviorLog.setS_subscribe_id(TransFormUtils.getSubscribe(dkBehaviorLog.getS_ip()));
+//// dkBehaviorLog.setD_subscribe_id(TransFormUtils.getSubscribe(dkBehaviorLog.getD_ip()));
+// return JSONObject.toJSONString(dkBehaviorLog);
+// } catch (Exception e) {
+// logger.error("Log parsing error!!! \n" + message);
+// e.printStackTrace();
+// return "";
+// }
+// }
+//
+// private static String sshLog(String message) {
+// try {
+// NTC_SSH_LOG ntcSshLog = JSONObject.parseObject(message, NTC_SSH_LOG.class, Feature.OrderedField);
+// ntcSshLog.setServer_locate(ipLookup.countryLookup(ntcSshLog.getD_ip()));
+// ntcSshLog.setClient_locate(ipLookup.cityLookupDetail(ntcSshLog.getS_ip()));
+// ntcSshLog.setD_asn(ipLookup.asnLookup(ntcSshLog.getD_ip()).trim());
+// ntcSshLog.setS_asn(ipLookup.asnLookup(ntcSshLog.getS_ip()).trim());
+// try {
+// ntcSshLog.setVersion(ntcSshLog.getVersion().replaceAll("\n", "\\\\n"));
+// ntcSshLog.setVersion(ntcSshLog.getVersion().replaceAll("\r", "\\\\r"));
+// } catch (Exception e) {
+// ntcSshLog.setVersion("");
+// }
+//// ntcSshLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcSshLog.getS_ip()));
+//// ntcSshLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcSshLog.getD_ip()));
+// return JSONObject.toJSONString(ntcSshLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String pptpLog(String message) {
+// try {
+// NTC_PPTP_LOG ntcPptpLog = JSONObject.parseObject(message, NTC_PPTP_LOG.class, Feature.OrderedField);
+// ntcPptpLog.setServer_locate(ipLookup.countryLookup(ntcPptpLog.getD_ip()));
+// ntcPptpLog.setClient_locate(ipLookup.cityLookupDetail(ntcPptpLog.getS_ip()));
+// ntcPptpLog.setD_asn(ipLookup.asnLookup(ntcPptpLog.getD_ip()).trim());
+// ntcPptpLog.setS_asn(ipLookup.asnLookup(ntcPptpLog.getS_ip()).trim());
+//// ntcPptpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcPptpLog.getS_ip()));
+//// ntcPptpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcPptpLog.getD_ip()));
+// return JSONObject.toJSONString(ntcPptpLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String l2tpLog(String message) {
+// try {
+// NTC_L2TP_LOG ntcL2TPLog = JSONObject.parseObject(message, NTC_L2TP_LOG.class, Feature.OrderedField);
+// ntcL2TPLog.setServer_locate(ipLookup.countryLookup(ntcL2TPLog.getD_ip()));
+// ntcL2TPLog.setClient_locate(ipLookup.cityLookupDetail(ntcL2TPLog.getS_ip()));
+// ntcL2TPLog.setD_asn(ipLookup.asnLookup(ntcL2TPLog.getD_ip()).trim());
+// ntcL2TPLog.setS_asn(ipLookup.asnLookup(ntcL2TPLog.getS_ip()).trim());
+//// ntcL2TPLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcL2TPLog.getS_ip()));
+//// ntcL2TPLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcL2TPLog.getD_ip()));
+// return JSONObject.toJSONString(ntcL2TPLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+// private static String ipsecLog(String message) {
+// try {
+// NTC_IPSEC_LOG ntcIpsecLog = JSONObject.parseObject(message, NTC_IPSEC_LOG.class, Feature.OrderedField);
+// ntcIpsecLog.setServer_locate(ipLookup.countryLookup(ntcIpsecLog.getD_ip()));
+// ntcIpsecLog.setClient_locate(ipLookup.cityLookupDetail(ntcIpsecLog.getS_ip()));
+// ntcIpsecLog.setD_asn(ipLookup.asnLookup(ntcIpsecLog.getD_ip()).trim());
+// ntcIpsecLog.setS_asn(ipLookup.asnLookup(ntcIpsecLog.getS_ip()).trim());
+//// ntcIpsecLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcIpsecLog.getS_ip()));
+//// ntcIpsecLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcIpsecLog.getD_ip()));
+// return JSONObject.toJSONString(ntcIpsecLog);
+// } catch (Exception e) {
+// e.printStackTrace();
+// logger.error("Log parsing error!!! \n" + message);
+// return "";
+// }
+// }
+//
+//
+//}
diff --git a/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java b/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java
new file mode 100644
index 0000000..23407dd
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java
@@ -0,0 +1,61 @@
+package cn.ac.iie.utils.redis;
+
+import org.apache.log4j.Logger;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPoolConfig;
+
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Redis连接池
+ *
+ * @author my
+ * @date 2018-07-04
+ */
+public final class RedisPollUtils {
+ private static final Logger logger = Logger.getLogger(RedisPollUtils.class);
+ private static JedisCluster jedisCluster;
+ private static Properties props = new Properties();
+
+ /**
+ * 不允许通过new创建该类的实例
+ */
+ private RedisPollUtils() {
+ }
+
+ /**
+ * 初始化Redis连接池
+ */
+ public static JedisCluster getJedisCluster() {
+ try {
+ String redisConfigFile = "redis_aaa_config.properties";
+ props.load(RedisPollUtils.class.getClassLoader().getResourceAsStream(redisConfigFile));
+ } catch (IOException e) {
+ logger.error("Properties Initialization Failed!!!!");
+ e.printStackTrace();
+ }
+ JedisPoolConfig poolConfig = new JedisPoolConfig();
+ poolConfig.setMaxTotal(Integer.valueOf(props.getProperty("jedis.pool.maxActive")));
+ poolConfig.setMaxIdle(Integer.valueOf(props.getProperty("jedis.pool.maxIdle")));
+ poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty("jedis.pool.maxWait")));
+ poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty("jedis.pool.testOnReturn")));
+ poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty("jedis.pool.testOnBorrow")));
+ Set<HostAndPort> nodes = new LinkedHashSet<HostAndPort>();
+ for (String port : props.getProperty("redis.port").split(",")) {
+ nodes.add(new HostAndPort(props.getProperty("redis.ip1"), Integer.parseInt(port)));
+ nodes.add(new HostAndPort(props.getProperty("redis.ip2"), Integer.parseInt(port)));
+ nodes.add(new HostAndPort(props.getProperty("redis.ip3"), Integer.parseInt(port)));
+ nodes.add(new HostAndPort(props.getProperty("redis.ip4"), Integer.parseInt(port)));
+ nodes.add(new HostAndPort(props.getProperty("redis.ip5"), Integer.parseInt(port)));
+ }
+ if (jedisCluster == null) {
+ jedisCluster = new JedisCluster(nodes, poolConfig);
+ }
+ return jedisCluster;
+ }
+
+}
diff --git a/src/main/java/cn/ac/iie/utils/redis/RedisUrlPollUtils.java b/src/main/java/cn/ac/iie/utils/redis/RedisUrlPollUtils.java
new file mode 100644
index 0000000..98cd1be
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/redis/RedisUrlPollUtils.java
@@ -0,0 +1,61 @@
+package cn.ac.iie.utils.redis;
+
+import org.apache.log4j.Logger;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPoolConfig;
+
+import java.util.LinkedHashSet;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Redis连接池
+ *
+ * @author my
+ * @date 2018-07-04
+ */
+public class RedisUrlPollUtils {
+ private static final Logger logger = Logger.getLogger(RedisUrlPollUtils.class);
+ private static String redisConfigFile = "redis_url_config.properties";
+ private static JedisCluster jedisCluster;
+ private static Properties props = new Properties();
+
+ /**
+ * 不允许通过new创建该类的实例
+ */
+ private RedisUrlPollUtils() {
+ }
+
+ /**
+ * 初始化Redis连接池
+ */
+ public static JedisCluster getJedisCluster() {
+ try {
+ props.load(RedisUrlPollUtils.class.getClassLoader().getResourceAsStream(redisConfigFile));
+ JedisPoolConfig poolConfig = new JedisPoolConfig();
+ poolConfig.setMaxTotal(Integer.valueOf(props.getProperty("jedis.pool.maxActive")));
+ poolConfig.setMaxIdle(Integer.valueOf(props.getProperty("jedis.pool.maxIdle")));
+ poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty("jedis.pool.maxWait")));
+ poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty("jedis.pool.testOnBorrow")));
+ poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty("jedis.pool.testOnReturn")));
+ Set<HostAndPort> nodes = new LinkedHashSet<HostAndPort>();
+ for (String port : props.getProperty("redis.port").split(",")) {
+ nodes.add(new HostAndPort(props.getProperty("redis.ip1"), Integer.parseInt(port)));
+ nodes.add(new HostAndPort(props.getProperty("redis.ip2"), Integer.parseInt(port)));
+ nodes.add(new HostAndPort(props.getProperty("redis.ip3"), Integer.parseInt(port)));
+ nodes.add(new HostAndPort(props.getProperty("redis.ip4"), Integer.parseInt(port)));
+ nodes.add(new HostAndPort(props.getProperty("redis.ip5"), Integer.parseInt(port)));
+ }
+ if (jedisCluster == null) {
+ jedisCluster = new JedisCluster(nodes, poolConfig);
+ }
+ return jedisCluster;
+ } catch (Exception e) {
+ logger.error(("JedisCluster Connection creation Failed!!!") + e);
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+}