diff options
Diffstat (limited to 'src/main/java/cn/ac/iie/utils')
11 files changed, 2362 insertions, 0 deletions
diff --git a/src/main/java/cn/ac/iie/utils/CSVAlarm.java b/src/main/java/cn/ac/iie/utils/CSVAlarm.java new file mode 100644 index 0000000..fd64b5f --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/CSVAlarm.java @@ -0,0 +1,77 @@ +package cn.ac.iie.utils; + +import cn.ac.iie.common.HttpManager; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.entity.StringEntity; +import org.apache.http.util.EntityUtils; +import org.apache.log4j.Logger; + +public class CSVAlarm { + //类初始化时,自动实例化,饿汉单例模式 + private static final CSVAlarm csvAlarm = new CSVAlarm(); + private static Logger logger = Logger.getLogger(CSVAlarm.class); + + public static CSVAlarm getInfoLoadInstance(){ + return csvAlarm; + } + + //私有构造方法,防止外部构建 + private CSVAlarm(){ + } + + public void csvDataLoad(String url, String topicNmane, String data) throws Exception { + // http client + DefaultHttpClient httpClient = new DefaultHttpClient(); + String topic = topicNmane; + try { + // 需要发送的数据 + String msg = data;//每条数据内部"\t"分割,数据间"\n"分割;//自己又换了分隔符:每条数据内部"#"分割,数据间"$"分割; + //开始结束时间 +// long start = System.currentTimeMillis(); + HttpPost request = new HttpPost(url); + //用户名与密码,用于权限控制,传输时加密 +// request.addHeader("User", "LiMing"); +// request.addHeader("Password", "123"); + //指定使用topic 自动绑定对应schema + request.addHeader("Topic", topic); + //Schema-Version可选,不填或为空默认使用最新版本的schema + request.addHeader("Schema-Version", "2"); + //csv 或者 avro,大小写不敏感 + request.addHeader("Format", "csv"); + //行列分隔符默认为下值 +// request.addHeader("Row-Split", "\\n"); +// request.addHeader("Field-Split", "\\t"); + request.addHeader("Row-Split", "\\n"); + request.addHeader("Field-Split", ","); + StringEntity payload = new StringEntity(msg); + request.setEntity(payload); + HttpResponse response = httpClient.execute(request); + try { + int statuCode = response.getStatusLine().getStatusCode(); + HttpEntity entity = response.getEntity(); + if (statuCode == 200) { + logger.info("数据中心加载成功, 返回码: " + statuCode); + System.out.println("数据中心加载成功, 返回码: " + statuCode); + EntityUtils.consume(entity); + } else { + String ret = EntityUtils.toString(entity); + EntityUtils.consume(entity); + logger.info("数据中心加载失败: " + ret + " --- code: " + statuCode + " ---失败数据为: \n" + data); + System.out.println("数据中心加载失败: " + ret + " --- code: " + statuCode + " ---失败数据为: \n" + data); + logger.error("数据中心加载失败: " + ret + " --- code: " + statuCode); + System.out.println("数据中心加载失败: " + ret + " --- code: " + statuCode); + } + } catch (Exception e) { + e.printStackTrace(); + } + } catch (Exception ex) { + // handle exception + ex.printStackTrace(); + } finally { + httpClient.getConnectionManager().shutdown(); //Deprecated + } + } +}
\ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java b/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java new file mode 100644 index 0000000..152bc4d --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/HiveDao/HdfsDataLoad_Avro.java @@ -0,0 +1,240 @@ +package cn.ac.iie.utils.HiveDao; + +import cn.ac.iie.bean.voipSipOrigin.SipOriginALL; +import cn.ac.iie.common.RealtimeCountConfig; +import com.alibaba.druid.pool.DruidDataSource; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +import java.net.URI; +import java.sql.Connection; +import java.sql.Statement; +import java.util.LinkedList; +import java.util.UUID; + +/** + * HDFS-Hive-Avro格式载入类 + * + * @author Colbert + */ +public class HdfsDataLoad_Avro { + + private static Logger logger = Logger.getLogger(HdfsDataLoad_Avro.class); + private static DruidDataSource ds = HiveDataSourceUtil.getHiveDataSource(); + private Connection con = null; + private Statement stmt = null; + private FSDataOutputStream outputStream_avro = null; + + private DataFileWriter<GenericRecord> fileWriter = null; + + private static HdfsDataLoad_Avro hdfsDataLoad_avro = null; + private static FileSystem fileSystem = null; + + private HdfsDataLoad_Avro() { + getHdfsConnection(); + } + + public static HdfsDataLoad_Avro getHdfsInstance() { + if (hdfsDataLoad_avro == null || fileSystem == null) { + hdfsDataLoad_avro = new HdfsDataLoad_Avro(); + } + return hdfsDataLoad_avro; + } + + /** + * 创建HDFS连接 + */ + private void getHdfsConnection() { + try { + Configuration conf = new Configuration(); + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.setBoolean("dfs.support.append", true); + conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); + conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true); + fileSystem = FileSystem.get(new URI(RealtimeCountConfig.HDFS_URL), conf, RealtimeCountConfig.HDFS_USER); + } catch (Exception e) { + logger.error("HdfsDataLoad_Avro getHdfsConnection method is error !!!--->{" + e + "}<---"); + e.printStackTrace(); + } + } + + + public void dataSipToHdfsAvro(String partition, LinkedList<String> data, String topicName, String logType, Long timeSend) { + long time = timeSend; + String uuid = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase(); + String hdfs_path = RealtimeCountConfig.HDFS_URL + RealtimeCountConfig.HDFS_PATH + topicName.toLowerCase() + "/" + partition + "/" + topicName.toLowerCase() + "-" + uuid + ".avro";//格式 hdfs://ns1/input/frag-media-expire-log/20190730/frag-media-expire-log-d84772e8257048f3be1ca82f8e35f215.avro + try { + Path path = new Path(hdfs_path); + fileSystem.createNewFile(path); + outputStream_avro = fileSystem.append(path); + + switch (logType) { + case "origin": + String schemaJsonSip = "{\"type\": \"record\",\"name\": \"siplog\",\"fields\": [{\"name\": \"call_id\", \"type\": [\"string\", \"null\"]},{\"name\": \"clj_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"found_time\", \"type\": \"int\"},{\"name\": \"src_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_nation\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_nation_code\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_location_region\", \"type\": [\"string\", \"null\"]},{\"name\": \"src_port\", \"type\": \"int\"},{\"name\": \"dst_ip\", \"type\": [\"string\", \"null\"]},{\"name\": \"ip_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_nation\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_nation_code\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_location_region\", \"type\": [\"string\", \"null\"]},{\"name\": \"dst_port\", \"type\": \"int\"},{\"name\": \"method\", \"type\": [\"string\", \"null\"]},{\"name\": \"request_uri\", \"type\": [\"string\", \"null\"]},{\"name\": \"user_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"service_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"service_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_stat\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_stat_format\", \"type\": [\"string\", \"null\"]},{\"name\": \"from\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"from_tag\", \"type\": [\"string\", \"null\"]},{\"name\": \"to\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"to_tag\", \"type\": [\"string\", \"null\"]},{\"name\": \"cseq\", \"type\": [\"string\", \"null\"]},{\"name\": \"cseq_method\", \"type\": [\"string\", \"null\"]},{\"name\": \"user_agent\", \"type\": [\"string\", \"null\"]},{\"name\": \"device_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"max_forwards\", \"type\": [\"string\", \"null\"]},{\"name\": \"server\", \"type\": [\"string\", \"null\"]},{\"name\": \"server_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_via_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_contact_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_record_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_expires\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_others\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_content_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_content\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_via_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_nickname\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_usr_name\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_contact_ser_domain\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_ser_domain_valid\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_record_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_route_json\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_expires\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_others\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_content_type\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_content\", \"type\": [\"string\", \"null\"]},{\"name\": \"req_coding\", \"type\": [\"string\", \"null\"]},{\"name\": \"res_coding\", \"type\": [\"string\", \"null\"]},{\"name\": \"stat_time\", \"type\": \"int\"}]}"; + Schema schemaSip = new Schema.Parser().parse(schemaJsonSip); + fileWriter = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schemaSip)).setSyncInterval(100); + fileWriter.setCodec(CodecFactory.snappyCodec()); + fileWriter.create(schemaSip, outputStream_avro); + for (String dataJson : data) { + SipOriginALL sipOriginAllLog = JSONObject.parseObject(dataJson, SipOriginALL.class); + sipOriginAllLog.setStat_time(time); + GenericRecord recordSip = new GenericData.Record(schemaSip); + + recordSip.put("call_id", sipOriginAllLog.getCall_ID()); + recordSip.put("clj_ip", sipOriginAllLog.getCLJ_IP()); + recordSip.put("found_time", sipOriginAllLog.getFound_Time()); + recordSip.put("src_ip", sipOriginAllLog.getSRC_IP()); + recordSip.put("src_location_nation", sipOriginAllLog.getSRC_LOCATION_NATION()); + recordSip.put("src_location_nation_code", sipOriginAllLog.getSRC_LOCATION_NATION_CODE()); + recordSip.put("src_location_region", sipOriginAllLog.getSRC_LOCATION_REGION()); + recordSip.put("src_port", sipOriginAllLog.getSRC_PORT()); + recordSip.put("dst_ip", sipOriginAllLog.getDST_IP()); + recordSip.put("ip_type", sipOriginAllLog.getIP_TYPE()); + recordSip.put("dst_location_nation", sipOriginAllLog.getDST_LOCATION_NATION()); + recordSip.put("dst_location_nation_code", sipOriginAllLog.getDST_LOCATION_NATION_CODE()); + recordSip.put("dst_location_region", sipOriginAllLog.getDST_LOCATION_REGION()); + recordSip.put("dst_port", sipOriginAllLog.getDST_PORT()); + recordSip.put("method", sipOriginAllLog.getMethod()); + recordSip.put("request_uri", sipOriginAllLog.getRequest_URI()); + recordSip.put("user_name", sipOriginAllLog.getUser_name()); + recordSip.put("service_domain", sipOriginAllLog.getService_domain()); + recordSip.put("service_domain_valid", sipOriginAllLog.getService_domain_valid()); + recordSip.put("res_stat", sipOriginAllLog.getRes_stat()); + recordSip.put("res_stat_format", sipOriginAllLog.getRes_stat_format()); + recordSip.put("from", sipOriginAllLog.getFrom()); + recordSip.put("from_nickname", sipOriginAllLog.getFrom_Nickname()); + recordSip.put("from_usr_name", sipOriginAllLog.getFrom_usr_name()); + recordSip.put("from_ser_domain", sipOriginAllLog.getFrom_ser_domain()); + recordSip.put("from_ser_domain_valid", sipOriginAllLog.getFrom_ser_domain_valid()); + recordSip.put("from_tag", sipOriginAllLog.getFrom_tag()); + recordSip.put("to", sipOriginAllLog.getTo()); + recordSip.put("to_nickname", sipOriginAllLog.getTo_Nickname()); + recordSip.put("to_usr_name", sipOriginAllLog.getTo_usr_name()); + recordSip.put("to_ser_domain", sipOriginAllLog.getTo_ser_domain()); + recordSip.put("to_ser_domain_valid", sipOriginAllLog.getTo_ser_domain_valid()); + recordSip.put("to_tag", sipOriginAllLog.getTo_tag()); + recordSip.put("cseq", sipOriginAllLog.getCseq()); + recordSip.put("cseq_method", sipOriginAllLog.getCseq_method()); + recordSip.put("user_agent", sipOriginAllLog.getUser_Agent()); + recordSip.put("device_type", sipOriginAllLog.getDevice_type()); + recordSip.put("max_forwards", sipOriginAllLog.getMax_Forwards()); + recordSip.put("server", sipOriginAllLog.getServer()); + recordSip.put("server_type", sipOriginAllLog.getServer_type()); + recordSip.put("req_via_json", sipOriginAllLog.getReq_Via_Json()); + recordSip.put("req_contact", sipOriginAllLog.getReq_Contact()); + recordSip.put("req_contact_nickname", sipOriginAllLog.getReq_Contact_Nickname()); + recordSip.put("req_contact_usr_name", sipOriginAllLog.getReq_Contact_usr_name()); + recordSip.put("req_contact_ser_domain", sipOriginAllLog.getReq_Contact_ser_domain()); + recordSip.put("req_ser_domain_valid", sipOriginAllLog.getReq_ser_domain_valid()); + recordSip.put("req_record_route_json", sipOriginAllLog.getReq_Record_Route_Json()); + recordSip.put("req_route_json", sipOriginAllLog.getReq_Route_Json()); + recordSip.put("req_expires", sipOriginAllLog.getReq_Expires()); + recordSip.put("req_others", sipOriginAllLog.getReq_Others()); + recordSip.put("req_content_type", sipOriginAllLog.getReq_Content_Type()); + recordSip.put("req_content", sipOriginAllLog.getReq_Content()); + recordSip.put("res_via_json", sipOriginAllLog.getRes_Via_Json()); + recordSip.put("res_contact", sipOriginAllLog.getRes_Contact()); + recordSip.put("res_contact_nickname", sipOriginAllLog.getRes_Contact_Nickname()); + recordSip.put("res_contact_usr_name", sipOriginAllLog.getRes_Contact_usr_name()); + recordSip.put("res_contact_ser_domain", sipOriginAllLog.getRes_Contact_ser_domain()); + recordSip.put("res_ser_domain_valid", sipOriginAllLog.getRes_ser_domain_valid()); + recordSip.put("res_record_route_json", sipOriginAllLog.getRes_Record_Route_Json()); + recordSip.put("res_route_json", sipOriginAllLog.getRes_Route_Json()); + recordSip.put("res_expires", sipOriginAllLog.getRes_Expires()); + recordSip.put("res_others", sipOriginAllLog.getRes_Others()); + recordSip.put("res_content_type", sipOriginAllLog.getRes_Content_Type()); + recordSip.put("res_content", sipOriginAllLog.getRes_Content()); + recordSip.put("req_coding", sipOriginAllLog.getReq_coding()); + recordSip.put("res_coding", sipOriginAllLog.getRes_coding()); + recordSip.put("stat_time", sipOriginAllLog.getStat_time()); + + fileWriter.append(recordSip); + fileWriter.flush(); + } + break; + default: + logger.error("HdfsDataLoad_Avro toHdfs logType is error !!!This logType is--->{" + logType + "}<---"); + break; + } + + logger.warn("HdfsDataLoad_Avro data to HDFS Successful,hdfs_path is -->{" + hdfs_path + "}<---"); + + if (fileWriter != null) { + fileWriter.close(); + fileWriter = null; + } + if (outputStream_avro != null) { + outputStream_avro.close(); + outputStream_avro = null; + } + + switch (logType) { + case "origin": + String tablenameSip = RealtimeCountConfig.HIVE_SIP_CLEAN_TABLE; + loadDataToHiveAvro(tablenameSip, partition, hdfs_path); + break; +// case "route": +// String tablenameFrag = RealtimeCountConfig.HIVE_SIP_ROUTE_TABLE; +// loadDataToHiveAvro(tablenameFrag, partition, hdfs_path); +// break; + default: + logger.error("HdfsDataLoad_Avro toHive logType is error !!!This logType is--->{" + logType + "}<---"); + break; + } + + } catch (Exception e) { + logger.error("HdfsDataLoad_Avro dataToHdfs method is error !!!--->{" + e + "}<---"); + e.printStackTrace(); + } + } + + public void loadDataToHiveAvro(String tablename, String partition, String hdfs_path) { + String dataUrl = hdfs_path; + StringBuffer sb = new StringBuffer(); + try { + con = ds.getConnection(); + stmt = con.createStatement(); + sb.append("load data inpath ").append("'").append(dataUrl).append("'") + .append(" into table ").append(tablename).append(" partition( time_partition=").append(partition).append(")"); + stmt.execute(sb.toString()); + + logger.warn("HdfsDataLoad_Avro data to Hive Successful,dataUrl is -->{" + dataUrl + "}<---"); + + } catch (Exception e) { + logger.error("HdfsDataLoad_Avro loadDataToHive method is error !!!--->{" + e + "}<---"); + e.printStackTrace(); + } finally { + try { + if (stmt != null) { + stmt.close(); + stmt = null; + } + if (con != null) { + con.close(); + con = null; + } + } catch (Exception e) { + logger.error("HdfsDataLoad_Avro loadDataToHive when close is error !!!--->{" + e + "}<---"); + e.printStackTrace(); + } + } + } + + private String strDefaultValue(String str) { + return (StringUtil.isBlank(str) + || str == null + || str.equals("") + || str.length() == 0) ? "$#$" : str; + } +} diff --git a/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java b/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java new file mode 100644 index 0000000..9223f92 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/HiveDao/HiveDataSourceUtil.java @@ -0,0 +1,187 @@ +package cn.ac.iie.utils.HiveDao; + +import com.alibaba.druid.pool.DruidDataSource; +import com.alibaba.fastjson.JSONException; +import com.alibaba.fastjson.JSONObject; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import net.sf.json.JSONArray; +import org.apache.log4j.Logger; + +import java.sql.*; +import java.util.Properties; + +/** + * Hive-JDBC连接池 + * + * @author Colbert + */ +public class HiveDataSourceUtil { + private static DruidDataSource hiveDataSource = new DruidDataSource(); + public static Connection conn = null; + private static final Logger logger = Logger.getLogger(HiveDataSourceUtil.class); + + public static DruidDataSource getHiveDataSource() { + if (hiveDataSource.isInited()) { + return hiveDataSource; + } + + try { + Properties dsProp = new Properties(); + dsProp.load(HiveDataSourceUtil.class.getClassLoader().getResourceAsStream("hive.properties")); + hiveDataSource.setDriverClassName(dsProp.getProperty("hive_jdbc_drivers")); + //基本属性 url、user、password + hiveDataSource.setUrl(dsProp.getProperty("hive_jdbc_url")); + hiveDataSource.setUsername(dsProp.getProperty("hive_jdbc_username")); + hiveDataSource.setPassword(dsProp.getProperty("hive_jdbc_password")); + + //配置初始化大小、最小、最大 + hiveDataSource.setInitialSize(Integer.parseInt(dsProp.getProperty("hive_initialSize"))); + hiveDataSource.setMinIdle(Integer.parseInt(dsProp.getProperty("hive_minIdle"))); + hiveDataSource.setMaxActive(Integer.parseInt(dsProp.getProperty("hive_maxActive"))); + + //配置获取连接等待超时的时间 + hiveDataSource.setMaxWait(Integer.parseInt(dsProp.getProperty("hive_maxWait"))); + + //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + hiveDataSource.setTimeBetweenEvictionRunsMillis(60000); + + //配置一个连接在池中最小生存的时间,单位是毫秒 + hiveDataSource.setMinEvictableIdleTimeMillis(300000); + + hiveDataSource.setValidationQuery("SELECT 1"); + hiveDataSource.setTestWhileIdle(true); + hiveDataSource.setTestOnBorrow(true); +// hiveDataSource.setKeepAlive(true); + + //打开PSCache,并且指定每个连接上PSCache的大小 + hiveDataSource.setPoolPreparedStatements(true); + hiveDataSource.setMaxPoolPreparedStatementPerConnectionSize(20); + + //配置监控统计拦截的filters +// hiveDataSource.setFilters("stat"); + + hiveDataSource.init(); + } catch (Exception e) { + e.printStackTrace(); + closeHiveDataSource(); + } + return hiveDataSource; + } + + /** + * @Description:关闭Hive连接池 + */ + public static void closeHiveDataSource() { + if (hiveDataSource != null) { + hiveDataSource.close(); + } + } + + /** + * @return + * @Description:获取Hive连接 + */ + public static Connection getHiveConn() { + try { + hiveDataSource = getHiveDataSource(); + conn = hiveDataSource.getConnection(); + } catch (Exception e) { + logger.error("HiveDataSourceUtil--" + e + ":获取Hive连接失败!"); + } + return conn; + } + + /** + * @Description:关闭Hive数据连接 + */ + public static void closeConn() { + try { + if (conn != null) { + conn.close(); + } + } catch (Exception e) { + logger.error("HiveDataSourceUtil--" + e + ":关闭Hive-conn连接失败!"); + } + } + + + public static void main(String[] args) throws Exception { + DruidDataSource ds = HiveDataSourceUtil.getHiveDataSource(); + Connection conn = ds.getConnection(); + Statement stmt = null; + if (conn == null) { + System.out.println("null"); + } else { + System.out.println("conn"); + stmt = conn.createStatement(); + ResultSet res = stmt.executeQuery("select * from test.frag_media_expire_log limit 10"); + int i = 0; + while (res.next()) { + if (i < 10) { + System.out.println(res.getString(2)); + i++; + } + } +// String s = resultSetToJson(res); +// String s = ResultSetToJsonString(res); +// System.out.println(s); + } + + stmt.close(); + conn.close(); + } + + public static String resultSetToJson(ResultSet rs) throws SQLException, JSONException { + // json数组 + JSONArray array = new JSONArray(); + + // 获取列数 + ResultSetMetaData metaData = rs.getMetaData(); + int columnCount = metaData.getColumnCount(); + + // 遍历ResultSet中的每条数据 + while (rs.next()) { + JSONObject jsonObj = new JSONObject(); + + // 遍历每一列 + for (int i = 1; i <= columnCount; i++) { + String columnName = metaData.getColumnLabel(i); + String value = rs.getString(columnName); + jsonObj.put(columnName, value); + } +// array.put(jsonObj); + array.add(jsonObj); + } + + return array.toString(); + } + + public static final JsonObject ResultSetToJsonObject(ResultSet rs) { + JsonObject element = null; + JsonArray ja = new JsonArray(); + JsonObject jo = new JsonObject(); + ResultSetMetaData rsmd = null; + String columnName, columnValue = null; + try { + rsmd = rs.getMetaData(); + while (rs.next()) { + element = new JsonObject(); + for (int i = 0; i < rsmd.getColumnCount(); i++) { + columnName = rsmd.getColumnName(i + 1); + columnValue = rs.getString(columnName); + element.addProperty(columnName, columnValue); + } + ja.add(element); + } + jo.add("result", ja); + } catch (SQLException e) { + e.printStackTrace(); + } + return jo; + } + + public static final String ResultSetToJsonString(ResultSet rs) { + return ResultSetToJsonObject(rs).toString(); + } +} diff --git a/src/main/java/cn/ac/iie/utils/IPIPLibrary/Ipip.java b/src/main/java/cn/ac/iie/utils/IPIPLibrary/Ipip.java new file mode 100644 index 0000000..ec1febe --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/IPIPLibrary/Ipip.java @@ -0,0 +1,189 @@ +package cn.ac.iie.utils.IPIPLibrary; + +/** + * Created by lizhao on 2017-9-15. + */ +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.nio.charset.Charset; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +public class Ipip { + + public static String randomIp() { + Random r = new Random(); + StringBuffer str = new StringBuffer(); + str.append(r.nextInt(1000000) % 255); + str.append("."); + str.append(r.nextInt(1000000) % 255); + str.append("."); + str.append(r.nextInt(1000000) % 255); + str.append("."); + str.append(0); + + return str.toString(); + } + + public static void main(String[] args){ + Ipip.load("E:\\ipip.dat"); + + Long st = System.nanoTime(); + for (int i = 0; i < 100; i++) + { + //IP.find(randomIp()); + System.out.println(Arrays.toString(Ipip.find(randomIp()))); + } + Long et = System.nanoTime(); + System.out.println((et - st) / 1000 / 1000); + +// System.out.println(Arrays.toString(IP.find("118.28.8.8"))); + } + + public static boolean enableFileWatch = false; + + private static int offset; + private static int[] index = new int[256]; + private static ByteBuffer dataBuffer; + private static ByteBuffer indexBuffer; + private static Long lastModifyTime = 0L; + private static File ipFile ; + private static ReentrantLock lock = new ReentrantLock(); + + public static void load(String filename) { + ipFile = new File(filename); + load(); + if (enableFileWatch) { + watch(); + } + } + + public static void load(String filename, boolean strict) throws Exception { + ipFile = new File(filename); + if (strict) { + int contentLength = Long.valueOf(ipFile.length()).intValue(); + if (contentLength < 512 * 1024) { + throw new Exception("ip data file error."); + } + } + load(); + if (enableFileWatch) { + watch(); + } + } + + public static String[] find(String ip) { + int ip_prefix_value = new Integer(ip.substring(0, ip.indexOf("."))); + long ip2long_value = ip2long(ip); + int start = index[ip_prefix_value]; + int max_comp_len = offset - 1028; + long index_offset = -1; + int index_length = -1; + byte b = 0; + for (start = start * 8 + 1024; start < max_comp_len; start += 8) { + if (int2long(indexBuffer.getInt(start)) >= ip2long_value) { + index_offset = bytesToLong(b, indexBuffer.get(start + 6), indexBuffer.get(start + 5), indexBuffer.get(start + 4)); + index_length = 0xFF & indexBuffer.get(start + 7); + break; + } + } + + byte[] areaBytes; + + lock.lock(); + try { + dataBuffer.position(offset + (int) index_offset - 1024); + areaBytes = new byte[index_length]; + dataBuffer.get(areaBytes, 0, index_length); + } finally { + lock.unlock(); + } + + return new String(areaBytes, Charset.forName("UTF-8")).split("\t", -1); + } + + private static void watch() { + Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + long time = ipFile.lastModified(); + if (time > lastModifyTime) { + lastModifyTime = time; + load(); + } + } + }, 1000L, 5000L, TimeUnit.MILLISECONDS); + } + + private static void load() { + lastModifyTime = ipFile.lastModified(); + FileInputStream fin = null; + lock.lock(); + try { + dataBuffer = ByteBuffer.allocate(Long.valueOf(ipFile.length()).intValue()); + fin = new FileInputStream(ipFile); + int readBytesLength; + byte[] chunk = new byte[4096]; + while (fin.available() > 0) { + readBytesLength = fin.read(chunk); + dataBuffer.put(chunk, 0, readBytesLength); + } + dataBuffer.position(0); + int indexLength = dataBuffer.getInt(); + byte[] indexBytes = new byte[indexLength]; + dataBuffer.get(indexBytes, 0, indexLength - 4); + indexBuffer = ByteBuffer.wrap(indexBytes); + indexBuffer.order(ByteOrder.LITTLE_ENDIAN); + offset = indexLength; + + int loop = 0; + while (loop++ < 256) { + index[loop - 1] = indexBuffer.getInt(); + } + indexBuffer.order(ByteOrder.BIG_ENDIAN); + } catch (IOException ioe) { + ioe.printStackTrace(); + } finally { + try { + if (fin != null) { + fin.close(); + } + } catch (IOException e){ + e.printStackTrace(); + } + lock.unlock(); + } + } + + private static long bytesToLong(byte a, byte b, byte c, byte d) { + return int2long((((a & 0xff) << 24) | ((b & 0xff) << 16) | ((c & 0xff) << 8) | (d & 0xff))); + } + + private static int str2Ip(String ip) { + String[] ss = ip.split("\\."); + int a, b, c, d; + a = Integer.parseInt(ss[0]); + b = Integer.parseInt(ss[1]); + c = Integer.parseInt(ss[2]); + d = Integer.parseInt(ss[3]); + return (a << 24) | (b << 16) | (c << 8) | d; + } + + private static long ip2long(String ip) { + return int2long(str2Ip(ip)); + } + + private static long int2long(int i) { + long l = i & 0x7fffffffL; + if (i < 0) { + l |= 0x080000000L; + } + return l; + } +}
\ No newline at end of file diff --git a/src/main/java/cn/ac/iie/utils/RealtimeCountConfigurations.java b/src/main/java/cn/ac/iie/utils/RealtimeCountConfigurations.java new file mode 100644 index 0000000..2a21798 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/RealtimeCountConfigurations.java @@ -0,0 +1,67 @@ +package cn.ac.iie.utils; + +import java.util.Properties; + +//import com.nis.util.StringUtil; +public final class RealtimeCountConfigurations { + + private static Properties propCommon = new Properties();//0 + private static Properties propService = new Properties();//1 + + public static String getStringProperty(Integer type, String key) { + if(type == 0){ + return propCommon.getProperty(key); + } else if(type == 1){ + return propService.getProperty(key); + } else { + return null; + } + } + + + public static Integer getIntProperty(Integer type, String key) { + if(type == 0){ + return Integer.parseInt(propCommon.getProperty(key)); + } else if(type == 1){ + return Integer.parseInt(propService.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if(type == 0){ + return Long.parseLong(propCommon.getProperty(key)); + } else if(type == 1){ + return Long.parseLong(propService.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if(type == 0){ + return propCommon.getProperty(key).toLowerCase().trim().equals("true"); + } else if(type == 1){ + return propService.getProperty(key).toLowerCase().trim().equals("true"); + } else { + return null; + } + } + + static { + try { + propCommon.load(RealtimeCountConfigurations.class.getClassLoader().getResourceAsStream("realtime_config.properties"));//0 + propService.load(RealtimeCountConfigurations.class.getClassLoader().getResourceAsStream("storm_config.properties"));//1 + /*prop.load(new FileInputStream(System.getProperty("user.dir") + + File.separator + "config"+File.separator + "config.properties"));*/ + System.out.println("realtime_config.properties加载成功"); + System.out.println("storm_config.properties加载成功"); + + } catch (Exception e) { + propCommon = null; + propService = null; + System.err.println("RealtimeCountConfigurations配置文件加载失败"); + } + } +} diff --git a/src/main/java/cn/ac/iie/utils/TupleUtils.java b/src/main/java/cn/ac/iie/utils/TupleUtils.java new file mode 100644 index 0000000..1a5889a --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/TupleUtils.java @@ -0,0 +1,13 @@ +package cn.ac.iie.utils; + +import org.apache.storm.Constants; +import org.apache.storm.tuple.Tuple; + +public final class TupleUtils { + //判断是否系统自动发送的Tuple + public static boolean isTick(Tuple tuple) { + return tuple != null + && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) + && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); + } +} diff --git a/src/main/java/cn/ac/iie/utils/dao/ClickHouseUtils.java b/src/main/java/cn/ac/iie/utils/dao/ClickHouseUtils.java new file mode 100644 index 0000000..3dced6d --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/dao/ClickHouseUtils.java @@ -0,0 +1,47 @@ +package cn.ac.iie.utils.dao; + + +import com.zdjizhi.utils.StringUtil; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; + +/** + * ClickHouse 入库类型转换类 + * + * @author Administrator + */ +public class ClickHouseUtils { + + public static void setInt(PreparedStatement pstms, int index, String str) { + try { + int num = 0; + if (str != null) { + num = Integer.parseInt(str); + } + pstms.setInt(index, num); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + public static void setString(PreparedStatement pstmts, int index, String str) throws Exception { + if (StringUtil.isNotBlank(str)) { + pstmts.setString(index, str); + } else { + str = ""; + pstmts.setString(index, str); + } + } + + public static void setTimeStamp(PreparedStatement pstmts, int index, String str) throws Exception { + pstmts.setTimestamp(index, new Timestamp(Long.parseLong(str + "000"))); + } + + public static void setLong(PreparedStatement pstmts, int index, String str) throws Exception { + pstmts.setLong(index, Long.parseLong(str)); + } + + +} diff --git a/src/main/java/cn/ac/iie/utils/getjson/GetJsonToKafkaUtils.java b/src/main/java/cn/ac/iie/utils/getjson/GetJsonToKafkaUtils.java new file mode 100644 index 0000000..a73aa7d --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/getjson/GetJsonToKafkaUtils.java @@ -0,0 +1,835 @@ +//package cn.ac.iie.utils.getjson; +// +//import cn.ac.iie.bean.dk.DK_BEHAVIOR_LOG; +//import cn.ac.iie.bean.mm.MM_AV_IP_LOG; +//import cn.ac.iie.bean.mm.MM_VOIP_IP_LOG; +//import cn.ac.iie.bean.ntc.NTC_CONN_RECORD_LOG; +//import cn.ac.iie.bean.pxy.PXY_HTTP_LOG; +//import cn.ac.iie.common.FlowWriteConfig; +//import cn.ac.iie.common.RealtimeCountConfig; +//import cn.ac.iie.utils.ordinary.DecodeUtils; +//import cn.ac.iie.utils.ordinary.MD5Utils; +//import cn.ac.iie.utils.ordinary.TransFormUtils; +//import com.alibaba.fastjson.JSONObject; +//import com.alibaba.fastjson.parser.Feature; +//import com.zdjizhi.utils.IpLookup; +//import com.zdjizhi.utils.StringUtil; +//import org.apache.log4j.Logger; +// +///** +// * @author antlee +// * @date 2018/7/19 +// */ +//public class GetJsonToKafkaUtils { +// private static Logger logger = Logger.getLogger(GetJsonToKafkaUtils.class); +// private static IpLookup ipLookup = new IpLookup.Builder(false) +// .loadDataFileV4(RealtimeCountConfig.IP_LIBRARY) +// .loadDataFileV6(RealtimeCountConfig.IP_LIBRARY) +// .build(); +// +// +// /** +// * NTC topic对准类 +// * +// * @param message 日志 +// * @param topic topic名称 +// * @return 补全日志 +// */ +// public static String getNTCData(String message, String topic) { +// switch (topic) { +// case "NTC-HTTP-LOG": +// return httpReplenish(message); +// case "NTC-MAIL-LOG": +// return mailReplenish(message); +// case "NTC-IP-LOG": +// return ipReplenish(message); +// case "NTC-APP-LOG": +// return appReplenish(message); +// case "NTC-SSL-LOG": +// return sslReplenish(message); +// case "NTC-DDOS-LOG": +// return ddosReplenish(message); +// case "NTC-DNS-LOG": +// return dnsReplenish(message); +// case "NTC-COLLECT-MAIL-LOG": +// return mailReplenish(message); +// case "NTC-OPENVPN-LOG": +// return openVpnLog(message); +// case "NTC-P2P-LOG": +// return p2pReplenish(message); +// case "PXY-HTTP-LOG": +// return pxyHttpReplenish(message); +// case "NTC-BGP-LOG": +// return bgpReplenish(message); +// case "NTC-FTP-LOG": +// return ftpReplenish(message); +// case "NTC-STREAMING-MEDIA-LOG": +// return streamMediaReplenish(message); +// case "NTC-VOIP-LOG": +// return voipReplenish(message); +// +//// case "NTC-COLLECT-HTTP-LOG": +//// return message; +//// case "NTC-CONN-RECORD-LOG": +//// return message; +//// case "NTC-COLLECT-RADIUS-LOG": +//// return message; +//// case "NTC-KEYWORDS-URL-LOG": +//// return message; +// +//// case "NTC-IPSEC-LOG": +//// return ipsecLog(message); +//// case "NTC-L2TP-LOG": +//// return l2tpLog(message); +//// case "NTC-SSH-LOG": +//// return sshLog(message); +//// case "NTC-PPTP-LOG": +//// return pptpReplenish(message); +// default: +// logger.error("There is no corresponding topic! topic name is :" + topic); +// break; +// } +// return null; +// } +// +// +// /** +// * HTTP Log Replenish +// * +// * @param message +// * @return +// */ +// private static String httpReplenish(String message) { +// try { +// NTC_HTTP_LOG ntcHttpLog = JSONObject.parseObject(message, NTC_HTTP_LOG.class); +// String sIp = ntcHttpLog.getS_ip(); +// String dIp = ntcHttpLog.getD_ip(); +// ntcHttpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcHttpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcHttpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcHttpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +// ntcHttpLog.setWebsite(StringUtil.getDomain(ntcHttpLog.getUrl())); +//// ntcHttpLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP)); +//// ntcHttpLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp)); +// TransFormUtils.setHTTPFile(ntcHttpLog); +// return JSONObject.toJSONString(ntcHttpLog); +// } catch (Exception e) { +// logger.error(("HTTP 补全日志出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// +// /** +// * HTTP 文件方法 +// * +// * @param message +// * @return +// */ +// +// public static String httpLogFile(String message) { +// try { +// NTC_HTTP_LOG ntcHttpLog = JSONObject.parseObject(message, NTC_HTTP_LOG.class); +// int cfgId = ntcHttpLog.getCfg_id(); +// if (StringUtil.isBlank(ntcHttpLog.getUrl())) { +// TransFormUtils.getUniFlow(ntcHttpLog); +// } +// String url = ntcHttpLog.getUrl(); +// String reqBody = ntcHttpLog.getReq_body_file(); +// if (StringUtil.isNotBlank(reqBody)) { +// ntcHttpLog.setReq_body_key(MD5Utils.md5Encode(cfgId + url + ntcHttpLog.getReq_body_file() + ntcHttpLog.getS_ip() + ntcHttpLog.getFound_time())); +// } +// ntcHttpLog.setRes_body_key(TransFormUtils.getHttpKey(cfgId, url, ntcHttpLog.getRes_body_file())); +// ntcHttpLog.setReq_hdr_key(TransFormUtils.getHttpKey(cfgId, url, ntcHttpLog.getReq_hdr_file())); +// ntcHttpLog.setRes_hdr_key(TransFormUtils.getHttpKey(cfgId, url, ntcHttpLog.getRes_hdr_file())); +// return JSONObject.toJSONString(ntcHttpLog); +// } catch (Exception e) { +// logger.error(("HTPP 文件补全出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * IP日志不全操作方法 +// * +// * @param message +// * @return +// */ +// private static String ipReplenish(String message) { +// try { +// NTC_IP_LOG ntcIpLog = JSONObject.parseObject(message, NTC_IP_LOG.class); +// String sIp = ntcIpLog.getS_ip(); +// String dIp = ntcIpLog.getD_ip(); +// ntcIpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcIpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcIpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp)); +//// ntcIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP)); +// return JSONObject.toJSONString(ntcIpLog); +// } catch (Exception e) { +// logger.error(("IP 日志补全操作出现异常!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * MAIL文件传输方法 +// * +// * @param message +// * @return +// */ +// public static String emailFile(String message) { +// try { +// NTC_MAIL_LOG ntcMailLog = JSONObject.parseObject(message, NTC_MAIL_LOG.class); +// ntcMailLog.setEml_key(TransFormUtils.getEmlKey(ntcMailLog.getFound_time(), ntcMailLog.getMail_from(), +// ntcMailLog.getMail_to(), ntcMailLog.getSubject(), ntcMailLog.getEml_file())); +// if (StringUtil.isNotBlank(ntcMailLog.getSubject())) { +// String subjectCharset = JSONObject.parseObject(message).getString("subject_charset"); +// ntcMailLog.setSubject(DecodeUtils.base64Str(ntcMailLog.getSubject(), subjectCharset)); +// } +// return JSONObject.toJSONString(ntcMailLog); +// } catch (Exception e) { +// logger.error(("MAIL 文件传输出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * MAIL 日志详细信息补全方法 +// * +// * @param message +// * @return +// */ +// private static String mailReplenish(String message) { +// try { +// NTC_MAIL_LOG ntcMailLog = JSONObject.parseObject(message, NTC_MAIL_LOG.class); +// String sIp = ntcMailLog.getS_ip(); +// String dIp = ntcMailLog.getD_ip(); +// ntcMailLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcMailLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcMailLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcMailLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcMailLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP)); +//// ntcMailLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp)); +// TransFormUtils.setMailFile(ntcMailLog); +// return JSONObject.toJSONString(ntcMailLog); +// } catch (Exception e) { +// logger.error(("MAIL 日志补全出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * APP Log Replenish. +// * +// * @param message +// * @return +// */ +// private static String appReplenish(String message) { +// try { +// NTC_APP_LOG ntcAppLog = JSONObject.parseObject(message, NTC_APP_LOG.class); +// String sIp = ntcAppLog.getS_ip(); +// String dIp = ntcAppLog.getD_ip(); +// ntcAppLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcAppLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcAppLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcAppLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcAppLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP)); +//// ntcAppLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp)); +// return JSONObject.toJSONString(ntcAppLog); +// } catch (Exception e) { +// logger.error(("APP 日志补全出现错误!!! ") + e); +// return ""; +// } +// } +// +// +// /** +// * DDOS Log Replenish +// * +// * @param message +// * @return +// */ +// private static String ddosReplenish(String message) { +// try { +// NTC_DDOS_LOG ntcDdosLog = JSONObject.parseObject(message, NTC_DDOS_LOG.class); +// String sIp = ntcDdosLog.getS_ip(); +// String dIp = ntcDdosLog.getD_ip(); +// ntcDdosLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcDdosLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcDdosLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcDdosLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcDdosLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP)); +//// ntcDdosLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp)); +// return JSONObject.toJSONString(ntcDdosLog); +// } catch (Exception e) { +// logger.error(("DDOS 日志补全出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * SSL Log Replenish +// * +// * @param message +// * @return +// */ +// private static String sslReplenish(String message) { +// try { +// NTC_SSL_LOG ntcSslLog = JSONObject.parseObject(message, NTC_SSL_LOG.class); +// String sIp = ntcSslLog.getS_ip(); +// String dIp = ntcSslLog.getD_ip(); +// ntcSslLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcSslLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcSslLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcSslLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcSslLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp)); +//// ntcSslLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP)); +// return JSONObject.toJSONString(ntcSslLog); +// } catch (Exception e) { +// logger.error("SSL 日志补全出现错误!!!" + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * DNS Log Replenish +// * +// * @param message +// * @return +// */ +// private static String dnsReplenish(String message) { +// try { +// NTC_DNS_LOG ntcDnsLog = JSONObject.parseObject(message, NTC_DNS_LOG.class); +// String sIp = ntcDnsLog.getS_ip(); +// String dIp = ntcDnsLog.getD_ip(); +// ntcDnsLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcDnsLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcDnsLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcDnsLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcDnsLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP)); +//// ntcDnsLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp)); +// return JSONObject.toJSONString(ntcDnsLog); +// } catch (Exception e) { +// logger.error(("DNS 日志补全出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * P2P Log Replenish +// * +// * @param message +// * @return +// */ +// private static String p2pReplenish(String message) { +// try { +// NTC_P2P_LOG ntcP2PLog = JSONObject.parseObject(message, NTC_P2P_LOG.class); +// String sIp = ntcP2PLog.getS_ip(); +// String dIp = ntcP2PLog.getD_ip(); +// ntcP2PLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcP2PLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcP2PLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcP2PLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcP2PLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp)); +//// ntcP2PLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP)); +// return JSONObject.toJSONString(ntcP2PLog); +// } catch (Exception e) { +// logger.error(("P2P 日志补全出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * pxy 文件方法 +// * +// * @param message +// * @return +// */ +// +// public static String pxyLogFile(String message) { +// try { +// PXY_HTTP_LOG pxyHttpLog = JSONObject.parseObject(message, PXY_HTTP_LOG.class); +// int cfgId = pxyHttpLog.getCfg_id(); +// String url = pxyHttpLog.getUrl(); +// pxyHttpLog.setReq_body_key(MD5Utils.md5Encode(cfgId + url + pxyHttpLog.getReq_body() + pxyHttpLog.getS_ip() + pxyHttpLog.getFound_time())); +// pxyHttpLog.setResp_body_key(TransFormUtils.getHttpKey(cfgId, url, pxyHttpLog.getResp_body())); +// pxyHttpLog.setWebsite(StringUtil.getDomain(pxyHttpLog.getUrl())); +// return JSONObject.toJSONString(pxyHttpLog); +// } catch (Exception e) { +// logger.error(("PXY 文件补全出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// +// /** +// * PXYHTTP Log Replenish +// * +// * @param message +// * @return +// */ +// private static String pxyHttpReplenish(String message) { +// try { +// PXY_HTTP_LOG pxyHttpLog = JSONObject.parseObject(message, PXY_HTTP_LOG.class); +// String sIp = pxyHttpLog.getS_ip(); +// String dIp = pxyHttpLog.getD_ip(); +// pxyHttpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// pxyHttpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// pxyHttpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// pxyHttpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +// if (StringUtil.isNotBlank(pxyHttpLog.getReq_body())) { +// pxyHttpLog.setReq_body("http://" + FlowWriteConfig.ASTANA_OSS_ADDRS + "/download/" + pxyHttpLog.getReq_body_key() + "." + "html"); +// } +// if (StringUtil.isNotBlank(pxyHttpLog.getResp_body())) { +// pxyHttpLog.setResp_body("http://" + FlowWriteConfig.ASTANA_OSS_ADDRS + "/download/" + pxyHttpLog.getResp_body_key() + "." + "html"); +// } +// return JSONObject.toJSONString(pxyHttpLog); +// } catch (Exception e) { +// logger.error("PXY 日志补全出现错误!!!" + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * BGP Log Replenish +// * +// * @param message +// * @return +// */ +// private static String bgpReplenish(String message) { +// try { +// NTC_BGP_LOG ntcBgpLog = JSONObject.parseObject(message, NTC_BGP_LOG.class); +// String sIp = ntcBgpLog.getS_ip(); +// String dIp = ntcBgpLog.getD_ip(); +// ntcBgpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcBgpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcBgpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcBgpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcBgpLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP)); +//// ntcBgpLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp)); +// return JSONObject.toJSONString(ntcBgpLog); +// } catch (Exception e) { +// logger.error(("BGP 日志补全出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * FTP Log Replenish +// * +// * @param message +// * @return +// */ +// private static String ftpReplenish(String message) { +// try { +// NTC_FTP_LOG ntcFtpLog = JSONObject.parseObject(message, NTC_FTP_LOG.class); +// String sIp = ntcFtpLog.getS_ip(); +// String dIp = ntcFtpLog.getD_ip(); +// ntcFtpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcFtpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcFtpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcFtpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcFtpLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP)); +//// ntcFtpLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp)); +// return JSONObject.toJSONString(ntcFtpLog); +// } catch (Exception e) { +// logger.error(("FTP 日志补全出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * Stream Media Log Replenish +// * +// * @param message +// * @return +// */ +// private static String streamMediaReplenish(String message) { +// try { +// NTC_STREAMING_MEDIA_LOG ntcStreamingMediaLog = JSONObject.parseObject(message, NTC_STREAMING_MEDIA_LOG.class); +// String sIp = ntcStreamingMediaLog.getS_ip(); +// String dIp = ntcStreamingMediaLog.getD_ip(); +// ntcStreamingMediaLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcStreamingMediaLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcStreamingMediaLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcStreamingMediaLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcStreamingMediaLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp)); +//// ntcStreamingMediaLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP)); +// return JSONObject.toJSONString(ntcStreamingMediaLog); +// } catch (Exception e) { +// logger.error(("Stream 日志补全出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// /** +// * Voip Log Replenish +// * +// * @param message +// * @return +// */ +// private static String voipReplenish(String message) { +// try { +// NTC_VOIP_LOG ntcVoipLog = JSONObject.parseObject(message, NTC_VOIP_LOG.class); +// String sIp = ntcVoipLog.getS_ip(); +// String dIp = ntcVoipLog.getD_ip(); +// ntcVoipLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcVoipLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcVoipLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcVoipLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcVoipLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp)); +//// ntcVoipLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP)); +// return JSONObject.toJSONString(ntcVoipLog); +// } catch (Exception e) { +// logger.error(("VOIP 日志补全出现错误!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// private static String openVpnLog(String message) { +// try { +// NTC_OPENVPN_LOG ntcOpenvpnLog = JSONObject.parseObject(message, NTC_OPENVPN_LOG.class); +// String sIp = ntcOpenvpnLog.getS_ip(); +// String dIp = ntcOpenvpnLog.getD_ip(); +// ntcOpenvpnLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcOpenvpnLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcOpenvpnLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcOpenvpnLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcOpenvpnLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcOpenvpnLog.getS_ip())); +//// ntcOpenvpnLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcOpenvpnLog.getD_ip())); +// return JSONObject.toJSONString(ntcOpenvpnLog); +// } catch (Exception e) { +// logger.error(("Log parsing error!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// +// /** +// * 音视频topic对准类 +// * +// * @param message 日志 +// * @param topic topic名 +// * @return 补全后日志 +// */ +// public static String getMMData(String message, String topic) { +// switch (topic) { +// case "NTC-COLLECT-VOIP-LOG": +// return collectVoipLog(message); +// case "MM-PORN-AUDIO-LEVEL-LOG": +// return avIpLog(message); +// case "MM-PORN-VIDEO-LEVEL-LOG": +// return avIpLog(message); +// case "MM-SAMPLE-AUDIO-LOG": +// return avIpLog(message); +// case "MM-SAMPLE-VIDEO-LOG": +// return avIpLog(message); +// case "MM-SAMPLE-PIC-LOG": +// return avIpLog(message); +// case "MM-SAMPLE-VOIP-LOG": +// return voipIpLog(message); +// case "MM-FILE-DIGEST-LOG": +// return avIpLog(message); +// case "MM-AV-IP-LOG": +// return avIpLog(message); +// case "MM-SPEAKER-RECOGNIZATION-LOG": +// return avIpLog(message); +// case "MM-LOGO-DETECTION-LOG": +// return avIpLog(message); +// case "MM-FACE-RECOGNIZATION-LOG": +// return avIpLog(message); +// case "MM-AV-URL-LOG": +// return avIpLog(message); +// case "MM-PIC-IP-LOG": +// return avIpLog(message); +// case "MM-PIC-URL-LOG": +// return avIpLog(message); +// case "MM-VOIP-IP-LOG": +// return voipIpLog(message); +// case "MM-VOIP-ACCOUNT-LOG": +// return voipIpLog(message); +// default: +// logger.error("There is no corresponding topic! topic name is :" + topic); +// break; +// } +// return null; +// } +// +// +// private static String avIpLog(String message) { +// try { +// MM_AV_IP_LOG mmAvIpLog = JSONObject.parseObject(message, MM_AV_IP_LOG.class); +// String sIp = mmAvIpLog.getS_ip(); +// String dIp = mmAvIpLog.getD_ip(); +// mmAvIpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// mmAvIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// mmAvIpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// mmAvIpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// mmAvIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp)); +//// mmAvIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP)); +// String url = mmAvIpLog.getLog_uri(); +// if (StringUtil.isNotBlank(url)) { +// String key = MD5Utils.md5Encode(url); +// String end = StringUtil.getFileExtendName(url).toLowerCase(); +// mmAvIpLog.setLog_uri("http://" + FlowWriteConfig.ALMATY_OSS_ADDRS + "/download/" + key + "." + end); +// } +// return JSONObject.toJSONString(mmAvIpLog); +// } catch (Exception e) { +// logger.error(("Log parsing error!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// private static String voipIpLog(String message) { +// try { +// MM_VOIP_IP_LOG mmVoipIpLog = JSONObject.parseObject(message, MM_VOIP_IP_LOG.class); +// String sIp = mmVoipIpLog.getS_ip(); +// String dIp = mmVoipIpLog.getD_ip(); +// mmVoipIpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// mmVoipIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// mmVoipIpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// mmVoipIpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// mmVoipIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp)); +//// mmVoipIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP)); +// String url = mmVoipIpLog.getLog_uri(); +// if (StringUtil.isNotBlank(url)) { +// String key = MD5Utils.md5Encode(url); +// String end = StringUtil.getFileExtendName(url).toLowerCase(); +// mmVoipIpLog.setLog_uri("http://" + FlowWriteConfig.ALMATY_OSS_ADDRS + "/download/" + key + "." + end); +// } +// return JSONObject.toJSONString(mmVoipIpLog); +// } catch (Exception e) { +// logger.error(("Log parsing error!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// +// private static String collectVoipLog(String message) { +// try { +// NTC_COLLECT_VOIP_LOG ntcCollectVoipLog = JSONObject.parseObject(message, NTC_COLLECT_VOIP_LOG.class); +// if (StringUtil.isNotBlank(ntcCollectVoipLog.getTo_from_store_url())) { +// String key = MD5Utils.md5Encode(ntcCollectVoipLog.getPid() + ntcCollectVoipLog.getTo_from_store_url() + ntcCollectVoipLog.getFound_time()); +// String url = ntcCollectVoipLog.getTo_from_store_url(); +// String end = StringUtil.getFileExtendName(url).toLowerCase(); +// ntcCollectVoipLog.setTo_from_store_url("http://" + FlowWriteConfig.ALMATY_OSS_ADDRS + "/download/" + key + "." + end); +// } +// if (StringUtil.isNotBlank(ntcCollectVoipLog.getFrom_to_store_url())) { +// String key = MD5Utils.md5Encode(ntcCollectVoipLog.getPid() + ntcCollectVoipLog.getFrom_to_store_url() + ntcCollectVoipLog.getFound_time()); +// String url = ntcCollectVoipLog.getFrom_to_store_url(); +// String end = StringUtil.getFileExtendName(url).toLowerCase(); +// ntcCollectVoipLog.setFrom_to_store_url("http://" + FlowWriteConfig.ALMATY_OSS_ADDRS + "/download/" + key + "." + end); +// } +// return JSONObject.toJSONString(ntcCollectVoipLog); +// } catch (Exception e) { +// logger.error(("Log parsing error!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// +// /** +// * 其他topic对准类 +// * +// * @param message 日志 +// * @return 补全后日志 +// */ +// public static String getOtherData(String message, String topic) { +// switch (topic) { +// case "NTC-HTTP-LOG": +// return otherHttpLog(message); +// case "NTC-CONN-RECORD-LOG": +// NTC_CONN_RECORD_LOG ntcConnRecordLog = JSONObject.parseObject(message, NTC_CONN_RECORD_LOG.class, Feature.OrderedField); +// ntcConnRecordLog.setD_asn(ipLookup.asnLookup(ntcConnRecordLog.getD_ip(), true)); +// ntcConnRecordLog.setS_asn(ipLookup.asnLookup(ntcConnRecordLog.getS_ip(), true)); +// return JSONObject.toJSONString(ntcConnRecordLog); +// default: +// logger.error("There is no corresponding topic! topic name is :" + topic); +// break; +// } +// return null; +// } +// +// +// private static String otherHttpLog(String message) { +// NTC_HTTP_LOG ntcHttpLog = JSONObject.parseObject(message, NTC_HTTP_LOG.class); +// if (ntcHttpLog.getService() == 152) { +// if (StringUtil.isBlank(ntcHttpLog.getUrl())) { +// TransFormUtils.getUniFlow(ntcHttpLog); +// } +// String sIp = ntcHttpLog.getS_ip(); +// String dIp = ntcHttpLog.getD_ip(); +// ntcHttpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcHttpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcHttpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcHttpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +// ntcHttpLog.setWebsite(StringUtil.getDomain(ntcHttpLog.getUrl())); +//// ntcHttpLog.setD_subscribe_id(RedisPollUtils.getJedisCluster().get(dIP)); +//// ntcHttpLog.setS_subscribe_id(RedisPollUtils.getJedisCluster().get(sIp)); +// return JSONObject.toJSONString(ntcHttpLog); +// } +// return ""; +// } +// +// +// /** +// * collect Mail to Mail +// * +// * @param message +// * @return +// */ +// public static String collectMailToMailLog(String message) { +// try { +// NTC_MAIL_LOG ntcMailLog = JSONObject.parseObject(message, NTC_MAIL_LOG.class); +// String sIp = ntcMailLog.getS_ip(); +// String dIp = ntcMailLog.getD_ip(); +// ntcMailLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcMailLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcMailLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcMailLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcMailLog.setS_subscribe_id(TransFormUtils.getSubscribe(sIp)); +//// ntcMailLog.setD_subscribe_id(TransFormUtils.getSubscribe(dIP)); +// ntcMailLog.setEml_key(TransFormUtils.getEmlKey(ntcMailLog.getFound_time(), ntcMailLog.getMail_from(), +// ntcMailLog.getMail_to(), ntcMailLog.getSubject(), ntcMailLog.getEml_file())); +// if (StringUtil.isNotBlank(ntcMailLog.getSubject())) { +// String subjectCharset = JSONObject.parseObject(message).getString("subject_charset"); +// ntcMailLog.setSubject(DecodeUtils.base64Str(ntcMailLog.getSubject(), subjectCharset)); +// } +// TransFormUtils.setMailFile(ntcMailLog); +// return JSONObject.toJSONString(ntcMailLog); +// } catch (Exception e) { +// logger.error(("Log parsing error!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// +// /** +// * ---------------------------------删除的topic------------------------------------------------------------ +// **/ +// +// private static String behaviorLog(String message) { +// try { +// DK_BEHAVIOR_LOG dkBehaviorLog = JSONObject.parseObject(message, DK_BEHAVIOR_LOG.class, Feature.OrderedField); +// dkBehaviorLog.setServer_locate(ipLookup.countryLookup(dkBehaviorLog.getD_ip())); +// dkBehaviorLog.setClient_locate(ipLookup.cityLookupDetail(dkBehaviorLog.getS_ip())); +// dkBehaviorLog.setD_asn(ipLookup.asnLookup(dkBehaviorLog.getD_ip()).trim()); +// dkBehaviorLog.setS_asn(ipLookup.asnLookup(dkBehaviorLog.getS_ip()).trim()); +//// dkBehaviorLog.setS_subscribe_id(TransFormUtils.getSubscribe(dkBehaviorLog.getS_ip())); +//// dkBehaviorLog.setD_subscribe_id(TransFormUtils.getSubscribe(dkBehaviorLog.getD_ip())); +// return JSONObject.toJSONString(dkBehaviorLog); +// } catch (Exception e) { +// logger.error(("Log parsing error!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// private static String ipsecLog(String message) { +// try { +// NTC_IPSEC_LOG ntcIpsecLog = JSONObject.parseObject(message, NTC_IPSEC_LOG.class, Feature.OrderedField); +// ntcIpsecLog.setServer_locate(ipLookup.countryLookup(ntcIpsecLog.getD_ip())); +// ntcIpsecLog.setClient_locate(ipLookup.cityLookupDetail(ntcIpsecLog.getS_ip())); +// ntcIpsecLog.setD_asn(ipLookup.asnLookup(ntcIpsecLog.getD_ip()).trim()); +// ntcIpsecLog.setS_asn(ipLookup.asnLookup(ntcIpsecLog.getS_ip()).trim()); +//// ntcIpsecLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcIpsecLog.getS_ip())); +//// ntcIpsecLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcIpsecLog.getD_ip())); +// return JSONObject.toJSONString(ntcIpsecLog); +// } catch (Exception e) { +// logger.error(("Log parsing error!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// private static String l2tpLog(String message) { +// try { +// NTC_L2TP_LOG ntcL2TPLog = JSONObject.parseObject(message, NTC_L2TP_LOG.class, Feature.OrderedField); +// ntcL2TPLog.setServer_locate(ipLookup.countryLookup(ntcL2TPLog.getD_ip())); +// ntcL2TPLog.setClient_locate(ipLookup.cityLookupDetail(ntcL2TPLog.getS_ip())); +// ntcL2TPLog.setD_asn(ipLookup.asnLookup(ntcL2TPLog.getD_ip()).trim()); +// ntcL2TPLog.setS_asn(ipLookup.asnLookup(ntcL2TPLog.getS_ip()).trim()); +//// ntcL2TPLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcL2TPLog.getS_ip())); +//// ntcL2TPLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcL2TPLog.getD_ip())); +// return JSONObject.toJSONString(ntcL2TPLog); +// } catch (Exception e) { +// logger.error(("Log parsing error!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// +// private static String sshLog(String message) { +// try { +// NTC_SSH_LOG ntcSshLog = JSONObject.parseObject(message, NTC_SSH_LOG.class, Feature.OrderedField); +// ntcSshLog.setServer_locate(ipLookup.countryLookup(ntcSshLog.getD_ip())); +// ntcSshLog.setClient_locate(ipLookup.cityLookupDetail(ntcSshLog.getS_ip())); +// ntcSshLog.setD_asn(ipLookup.asnLookup(ntcSshLog.getD_ip()).trim()); +// ntcSshLog.setS_asn(ipLookup.asnLookup(ntcSshLog.getS_ip()).trim()); +// try { +// ntcSshLog.setVersion(ntcSshLog.getVersion().replaceAll("\n", "\\\\n")); +// ntcSshLog.setVersion(ntcSshLog.getVersion().replaceAll("\r", "\\\\r")); +// } catch (Exception e) { +// ntcSshLog.setVersion(""); +// } +//// ntcSshLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcSshLog.getS_ip())); +//// ntcSshLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcSshLog.getD_ip())); +// return JSONObject.toJSONString(ntcSshLog); +// } catch (Exception e) { +// logger.error("Log parsing error!!! " + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// +// /** +// * PPTP Log Replenish +// * +// * @param message +// * @return +// */ +// private static String pptpReplenish(String message) { +// try { +// NTC_PPTP_LOG ntcPptpLog = JSONObject.parseObject(message, NTC_PPTP_LOG.class); +// ntcPptpLog.setServer_locate(ipLookup.countryLookup(ntcPptpLog.getD_ip())); +// ntcPptpLog.setClient_locate(ipLookup.cityLookupDetail(ntcPptpLog.getS_ip())); +// ntcPptpLog.setD_asn(ipLookup.asnLookup(ntcPptpLog.getD_ip()).trim()); +// ntcPptpLog.setS_asn(ipLookup.asnLookup(ntcPptpLog.getS_ip()).trim()); +//// ntcPptpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcPptpLog.getS_ip())); +//// ntcPptpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcPptpLog.getD_ip())); +// return JSONObject.toJSONString(ntcPptpLog); +// } catch (Exception e) { +// logger.error(("Log parsing error!!! ") + e); +// e.printStackTrace(); +// return ""; +// } +// } +// +// +// /**---------------------------------删除的topic------------------------------------------------------------**/ +// +//} diff --git a/src/main/java/cn/ac/iie/utils/getjson/GetStrToClickHouseUtils.java b/src/main/java/cn/ac/iie/utils/getjson/GetStrToClickHouseUtils.java new file mode 100644 index 0000000..3ac5d1e --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/getjson/GetStrToClickHouseUtils.java @@ -0,0 +1,585 @@ +//package cn.ac.iie.utils.getjson; +// +//import cn.ac.iie.bean.dk.DK_BEHAVIOR_LOG; +//import cn.ac.iie.bean.mm.MM_AV_IP_LOG; +//import cn.ac.iie.bean.mm.MM_VOIP_IP_LOG; +//import cn.ac.iie.bean.ntc.NTC_CONN_RECORD_LOG; +//import cn.ac.iie.bean.ntcwidely.NTC_COLLECT_SSL_LOG; +//import cn.ac.iie.bean.pxy.PXY_HTTP_LOG; +//import cn.ac.iie.common.FlowWriteConfig; +//import cn.ac.iie.common.RealtimeCountConfig; +//import cn.ac.iie.utils.ordinary.DecodeUtils; +//import com.alibaba.fastjson.JSONObject; +//import com.alibaba.fastjson.parser.Feature; +//import com.zdjizhi.utils.IpLookup; +//import com.zdjizhi.utils.StringUtil; +//import org.apache.log4j.Logger; +// +///** +// * @author antlee +// * @date 2018/7/19 +// */ +//public class GetStrToClickHouseUtils { +// private static Logger logger = Logger.getLogger(GetStrToClickHouseUtils.class); +// private static IpLookup ipLookup = new IpLookup.Builder(false) +// .loadDataFileV4(RealtimeCountConfig.IP_LIBRARY) +// .loadDataFileV6(RealtimeCountConfig.IP_LIBRARY) +// .build(); +// +// /** +// * NTC topic对准类 +// * +// * @param message 日志 +// * @param topic topic名称 +// * @return 补全日志 +// */ +// public static String getData(String message, String topic) { +// switch (topic) { +// case "NTC-CONN-RECORD-LOG": +// return ntcConnLog(message); +// case "NTC-COLLECT-MAIL-LOG": +// return mailLog(message); +// case "NTC-COLLECT-SSL-LOG": +// return sslLog(message); +// case "NTC-SSL-LOG": +// return sslLog(message); +// case "NTC-APP-LOG": +// return appLog(message); +// case "NTC-HTTP-LOG": +// return httpLog(message); +// case "NTC-IP-LOG": +// return ipLog(message); +// case "PXY-HTTP-LOG": +// return pHttpLog(message); +// case "NTC-DNS-LOG": +// return dnsLog(message); +// case "NTC-BGP-LOG": +// return bgpLog(message); +// case "NTC-DDOS-LOG": +// return ddosLog(message); +// case "NTC-FTP-LOG": +// return ftpLog(message); +// case "NTC-MAIL-LOG": +// return mailLog(message); +// case "NTC-OPENVPN-LOG": +// return openVpnLog(message); +// case "NTC-P2P-LOG": +// return p2pLog(message); +// case "NTC-STREAMING-MEDIA-LOG": +// return streamMediaLog(message); +// case "NTC-VOIP-LOG": +// return voipLog(message); +// case "NTC-KEYWORDS-URL-LOG": +// return message; +// case "MM-SAMPLE-AUDIO-LOG": +// return avIpLog(message); +// case "MM-SAMPLE-VIDEO-LOG": +// return avIpLog(message); +// case "MM-PORN-AUDIO-LEVEL-LOG": +// return avIpLog(message); +// case "MM-PORN-VIDEO-LEVEL-LOG": +// return avIpLog(message); +// case "MM-SAMPLE-PIC-LOG": +// return avIpLog(message); +// case "MM-SAMPLE-VOIP-LOG": +// return voipIpLog(message); +// case "MM-FILE-DIGEST-LOG": +// return avIpLog(message); +// case "NTC-COLLECT-VOIP-LOG": +// return collectVoipLog(message); +// +//// case "MM-AV-IP-LOG": +//// return avIpLog(message); +//// case "MM-AV-URL-LOG": +//// return avIpLog(message); +//// case "MM-PIC-IP-LOG": +//// return avIpLog(message); +//// case "MM-PIC-URL-LOG": +//// return avIpLog(message); +//// case "MM-VOIP-IP-LOG": +//// return voipIpLog(message); +//// case "MM-SPEAKER-RECOGNIZATION-LOG": +//// return avIpLog(message); +//// case "MM-LOGO-DETECTION-LOG": +//// return avIpLog(message); +//// case "MM-FACE-RECOGNIZATION-LOG": +//// return avIpLog(message); +//// case "MM-VOIP-ACCOUNT-LOG": +//// return voipIpLog(message); +//// case "NTC-IPSEC-LOG": +//// return ipsecLog(message); +//// case "NTC-L2TP-LOG": +//// return l2tpLog(message); +//// case "NTC-PPTP-LOG": +//// return pptpLog(message); +//// case "NTC-SSH-LOG": +//// return sshLog(message); +// +// default: +// logger.error("There is no corresponding topic! topic name is :" + topic); +// break; +// } +// return null; +// } +// +// +// private static String ntcConnLog(String message) { +// try { +// NTC_CONN_RECORD_LOG ntcConnRecordLog = JSONObject.parseObject(message, NTC_CONN_RECORD_LOG.class); +// String appLabel = ntcConnRecordLog.getApp_label(); +// String sIp = ntcConnRecordLog.getS_ip(); +// String dIp = ntcConnRecordLog.getD_ip(); +// ntcConnRecordLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcConnRecordLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcConnRecordLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcConnRecordLog.setS_asn(ipLookup.asnLookup(sIp, true)); +// if (StringUtil.isNotBlank(appLabel)) { +// String[] split = ntcConnRecordLog.getApp_label().split(";"); +// ntcConnRecordLog.setProto_id(Integer.parseInt(split[0].split("=")[1])); +// ntcConnRecordLog.setApp_id(Integer.parseInt(split[1].split("=")[1])); +// ntcConnRecordLog.setOs_id(Integer.parseInt(split[2].split("=")[1])); +// ntcConnRecordLog.setBs_id(Integer.parseInt(split[3].split("=")[1])); +// ntcConnRecordLog.setWeb_id(Integer.parseInt(split[4].split("=")[1])); +// ntcConnRecordLog.setBehav_id(Integer.parseInt(split[5].split("=")[1])); +// } else { +// ntcConnRecordLog.setProto_id(0); +// ntcConnRecordLog.setApp_id(0); +// ntcConnRecordLog.setOs_id(0); +// ntcConnRecordLog.setBs_id(0); +// ntcConnRecordLog.setWeb_id(0); +// ntcConnRecordLog.setBehav_id(0); +// } +// return JSONObject.toJSONString(ntcConnRecordLog); +// } catch (Exception e) { +// logger.error("Log parsing error!!! \n" + message); +// e.printStackTrace(); +// return ""; +// } +// } +// +// +// private static String avIpLog(String message) { +// try { +// MM_AV_IP_LOG mmAvIpLog = JSONObject.parseObject(message, MM_AV_IP_LOG.class); +// String sIp = mmAvIpLog.getS_ip(); +// String dIp = mmAvIpLog.getD_ip(); +// mmAvIpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// mmAvIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// mmAvIpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// mmAvIpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// mmAvIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(mmAvIpLog.getS_ip())); +//// mmAvIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(mmAvIpLog.getD_ip())); +// return JSONObject.toJSONString(mmAvIpLog); +// } catch (Exception e) { +// logger.error("Log parsing error!!! \n" + message); +// e.printStackTrace(); +// return ""; +// } +// } +// +// private static String voipIpLog(String message) { +// try { +// MM_VOIP_IP_LOG mmVoipIpLog = JSONObject.parseObject(message, MM_VOIP_IP_LOG.class); +// String sIp = mmVoipIpLog.getS_ip(); +// String dIp = mmVoipIpLog.getD_ip(); +// mmVoipIpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// mmVoipIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// mmVoipIpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// mmVoipIpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// mmVoipIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(mmVoipIpLog.getS_ip())); +//// mmVoipIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(mmVoipIpLog.getD_ip())); +// return JSONObject.toJSONString(mmVoipIpLog); +// } catch (Exception e) { +// logger.error("Log parsing error!!! \n" + message); +// e.printStackTrace(); +// return ""; +// } +// } +// +// private static String collectVoipLog(String message) { +// try { +// NTC_COLLECT_VOIP_LOG ntcCollectVoipLog = JSONObject.parseObject(message, NTC_COLLECT_VOIP_LOG.class); +// return JSONObject.toJSONString(ntcCollectVoipLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// +// private static String appLog(String message) { +// try { +// NTC_APP_LOG ntcAppLog = JSONObject.parseObject(message, NTC_APP_LOG.class); +// String sIp = ntcAppLog.getS_ip(); +// String dIp = ntcAppLog.getD_ip(); +// ntcAppLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcAppLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcAppLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcAppLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcAppLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcAppLog.getS_ip())); +//// ntcAppLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcAppLog.getD_ip())); +// return JSONObject.toJSONString(ntcAppLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String bgpLog(String message) { +// try { +// NTC_BGP_LOG ntcBgpLog = JSONObject.parseObject(message, NTC_BGP_LOG.class); +// String sIp = ntcBgpLog.getS_ip(); +// String dIp = ntcBgpLog.getD_ip(); +// ntcBgpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcBgpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcBgpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcBgpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcBgpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcBgpLog.getS_ip())); +//// ntcBgpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcBgpLog.getD_ip())); +// return JSONObject.toJSONString(ntcBgpLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String ddosLog(String message) { +// try { +// NTC_DDOS_LOG ntcDdosLog = JSONObject.parseObject(message, NTC_DDOS_LOG.class); +// String sIp = ntcDdosLog.getS_ip(); +// String dIp = ntcDdosLog.getD_ip(); +// ntcDdosLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcDdosLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcDdosLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcDdosLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcDdosLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcDdosLog.getS_ip())); +//// ntcDdosLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcDdosLog.getD_ip())); +// return JSONObject.toJSONString(ntcDdosLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String dnsLog(String message) { +// try { +// NTC_DNS_LOG ntcDnsLog = JSONObject.parseObject(message, NTC_DNS_LOG.class); +// String sIp = ntcDnsLog.getS_ip(); +// String dIp = ntcDnsLog.getD_ip(); +// ntcDnsLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcDnsLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcDnsLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcDnsLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcDnsLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcDnsLog.getS_ip())); +//// ntcDnsLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcDnsLog.getD_ip())); +// return JSONObject.toJSONString(ntcDnsLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String ftpLog(String message) { +// try { +// NTC_FTP_LOG ntcFtpLog = JSONObject.parseObject(message, NTC_FTP_LOG.class); +// String sIp = ntcFtpLog.getS_ip(); +// String dIp = ntcFtpLog.getD_ip(); +// ntcFtpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcFtpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcFtpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcFtpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcFtpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcFtpLog.getS_ip())); +//// ntcFtpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcFtpLog.getD_ip())); +// return JSONObject.toJSONString(ntcFtpLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String httpLog(String message) { +// try { +// NTC_HTTP_LOG ntcHttpLog = JSONObject.parseObject(message, NTC_HTTP_LOG.class); +//// if (StringUtil.isBlank(ntcHttpLog.getUrl())) { +//// TransFormUtils.getUniFlow(ntcHttpLog); +//// } +// String sIp = ntcHttpLog.getS_ip(); +// String dIp = ntcHttpLog.getD_ip(); +// ntcHttpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcHttpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcHttpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcHttpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcHttpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcHttpLog.getS_ip())); +//// ntcHttpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcHttpLog.getD_ip())); +// ntcHttpLog.setWebsite(StringUtil.getDomain(ntcHttpLog.getUrl())); +// return JSONObject.toJSONString(ntcHttpLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String ipLog(String message) { +// try { +// NTC_IP_LOG ntcIpLog = JSONObject.parseObject(message, NTC_IP_LOG.class); +// String sIp = ntcIpLog.getS_ip(); +// String dIp = ntcIpLog.getD_ip(); +// ntcIpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcIpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcIpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcIpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcIpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcIpLog.getS_ip())); +//// ntcIpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcIpLog.getD_ip())); +// return JSONObject.toJSONString(ntcIpLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// +// private static String mailLog(String message) { +// try { +// NTC_MAIL_LOG ntcMailLog = JSONObject.parseObject(message, NTC_MAIL_LOG.class); +// String sIp = ntcMailLog.getS_ip(); +// String dIp = ntcMailLog.getD_ip(); +// ntcMailLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcMailLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcMailLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcMailLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcMailLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcMailLog.getS_ip())); +//// ntcMailLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcMailLog.getD_ip())); +// if (StringUtil.isNotBlank(ntcMailLog.getSubject())) { +// String subjectCharset = JSONObject.parseObject(message).getString("subject_charset"); +// ntcMailLog.setSubject(DecodeUtils.base64Str(ntcMailLog.getSubject(), subjectCharset)); +// } +// return JSONObject.toJSONString(ntcMailLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String openVpnLog(String message) { +// try { +// NTC_OPENVPN_LOG ntcOpenvpnLog = JSONObject.parseObject(message, NTC_OPENVPN_LOG.class); +// String sIp = ntcOpenvpnLog.getS_ip(); +// String dIp = ntcOpenvpnLog.getD_ip(); +// ntcOpenvpnLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcOpenvpnLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcOpenvpnLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcOpenvpnLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcOpenvpnLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcOpenvpnLog.getS_ip())); +//// ntcOpenvpnLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcOpenvpnLog.getD_ip())); +// return JSONObject.toJSONString(ntcOpenvpnLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String p2pLog(String message) { +// try { +// NTC_P2P_LOG ntcP2PLog = JSONObject.parseObject(message, NTC_P2P_LOG.class); +// String sIp = ntcP2PLog.getS_ip(); +// String dIp = ntcP2PLog.getD_ip(); +// ntcP2PLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcP2PLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcP2PLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcP2PLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcP2PLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcP2PLog.getS_ip())); +//// ntcP2PLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcP2PLog.getD_ip())); +// return JSONObject.toJSONString(ntcP2PLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// +// private static String streamMediaLog(String message) { +// try { +// NTC_STREAMING_MEDIA_LOG ntcStreamingMediaLog = JSONObject.parseObject(message, NTC_STREAMING_MEDIA_LOG.class); +// String sIp = ntcStreamingMediaLog.getS_ip(); +// String dIp = ntcStreamingMediaLog.getD_ip(); +// ntcStreamingMediaLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcStreamingMediaLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcStreamingMediaLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcStreamingMediaLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcStreamingMediaLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcStreamingMediaLog.getS_ip())); +//// ntcStreamingMediaLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcStreamingMediaLog.getD_ip())); +// return JSONObject.toJSONString(ntcStreamingMediaLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String voipLog(String message) { +// try { +// NTC_VOIP_LOG ntcVoipLog = JSONObject.parseObject(message, NTC_VOIP_LOG.class); +// String sIp = ntcVoipLog.getS_ip(); +// String dIp = ntcVoipLog.getD_ip(); +// ntcVoipLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcVoipLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcVoipLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcVoipLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcVoipLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcVoipLog.getS_ip())); +//// ntcVoipLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcVoipLog.getD_ip())); +// return JSONObject.toJSONString(ntcVoipLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// +// private static String sslLog(String message) { +// try { +// NTC_COLLECT_SSL_LOG ntcSslLog = JSONObject.parseObject(message, NTC_COLLECT_SSL_LOG.class); +// String sIp = ntcSslLog.getS_ip(); +// String dIp = ntcSslLog.getD_ip(); +// ntcSslLog.setServer_locate(ipLookup.countryLookup(dIp)); +// ntcSslLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// ntcSslLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// ntcSslLog.setS_asn(ipLookup.asnLookup(sIp, true)); +//// ntcSslLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcSslLog.getS_ip())); +//// ntcSslLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcSslLog.getD_ip())); +// return JSONObject.toJSONString(ntcSslLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String pHttpLog(String message) { +// try { +// PXY_HTTP_LOG pxyHttpLog = JSONObject.parseObject(message, PXY_HTTP_LOG.class); +// String sIp = pxyHttpLog.getS_ip(); +// String dIp = pxyHttpLog.getD_ip(); +// pxyHttpLog.setServer_locate(ipLookup.countryLookup(dIp)); +// pxyHttpLog.setClient_locate(ipLookup.cityLookupDetail(sIp)); +// pxyHttpLog.setD_asn(ipLookup.asnLookup(dIp, true)); +// pxyHttpLog.setS_asn(ipLookup.asnLookup(sIp, true)); +// pxyHttpLog.setWebsite(StringUtil.getDomain(pxyHttpLog.getUrl())); +// return JSONObject.toJSONString(pxyHttpLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// +// /** +// * ---------------------------------删除的topic------------------------------------------------------------ +// **/ +// +// +// private static String behaviorLog(String message) { +// try { +// DK_BEHAVIOR_LOG dkBehaviorLog = JSONObject.parseObject(message, DK_BEHAVIOR_LOG.class, Feature.OrderedField); +// dkBehaviorLog.setServer_locate(ipLookup.countryLookup(dkBehaviorLog.getD_ip())); +// dkBehaviorLog.setClient_locate(ipLookup.cityLookupDetail(dkBehaviorLog.getS_ip())); +// dkBehaviorLog.setD_asn(ipLookup.asnLookup(dkBehaviorLog.getD_ip()).trim()); +// dkBehaviorLog.setS_asn(ipLookup.asnLookup(dkBehaviorLog.getS_ip()).trim()); +//// dkBehaviorLog.setS_subscribe_id(TransFormUtils.getSubscribe(dkBehaviorLog.getS_ip())); +//// dkBehaviorLog.setD_subscribe_id(TransFormUtils.getSubscribe(dkBehaviorLog.getD_ip())); +// return JSONObject.toJSONString(dkBehaviorLog); +// } catch (Exception e) { +// logger.error("Log parsing error!!! \n" + message); +// e.printStackTrace(); +// return ""; +// } +// } +// +// private static String sshLog(String message) { +// try { +// NTC_SSH_LOG ntcSshLog = JSONObject.parseObject(message, NTC_SSH_LOG.class, Feature.OrderedField); +// ntcSshLog.setServer_locate(ipLookup.countryLookup(ntcSshLog.getD_ip())); +// ntcSshLog.setClient_locate(ipLookup.cityLookupDetail(ntcSshLog.getS_ip())); +// ntcSshLog.setD_asn(ipLookup.asnLookup(ntcSshLog.getD_ip()).trim()); +// ntcSshLog.setS_asn(ipLookup.asnLookup(ntcSshLog.getS_ip()).trim()); +// try { +// ntcSshLog.setVersion(ntcSshLog.getVersion().replaceAll("\n", "\\\\n")); +// ntcSshLog.setVersion(ntcSshLog.getVersion().replaceAll("\r", "\\\\r")); +// } catch (Exception e) { +// ntcSshLog.setVersion(""); +// } +//// ntcSshLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcSshLog.getS_ip())); +//// ntcSshLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcSshLog.getD_ip())); +// return JSONObject.toJSONString(ntcSshLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String pptpLog(String message) { +// try { +// NTC_PPTP_LOG ntcPptpLog = JSONObject.parseObject(message, NTC_PPTP_LOG.class, Feature.OrderedField); +// ntcPptpLog.setServer_locate(ipLookup.countryLookup(ntcPptpLog.getD_ip())); +// ntcPptpLog.setClient_locate(ipLookup.cityLookupDetail(ntcPptpLog.getS_ip())); +// ntcPptpLog.setD_asn(ipLookup.asnLookup(ntcPptpLog.getD_ip()).trim()); +// ntcPptpLog.setS_asn(ipLookup.asnLookup(ntcPptpLog.getS_ip()).trim()); +//// ntcPptpLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcPptpLog.getS_ip())); +//// ntcPptpLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcPptpLog.getD_ip())); +// return JSONObject.toJSONString(ntcPptpLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String l2tpLog(String message) { +// try { +// NTC_L2TP_LOG ntcL2TPLog = JSONObject.parseObject(message, NTC_L2TP_LOG.class, Feature.OrderedField); +// ntcL2TPLog.setServer_locate(ipLookup.countryLookup(ntcL2TPLog.getD_ip())); +// ntcL2TPLog.setClient_locate(ipLookup.cityLookupDetail(ntcL2TPLog.getS_ip())); +// ntcL2TPLog.setD_asn(ipLookup.asnLookup(ntcL2TPLog.getD_ip()).trim()); +// ntcL2TPLog.setS_asn(ipLookup.asnLookup(ntcL2TPLog.getS_ip()).trim()); +//// ntcL2TPLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcL2TPLog.getS_ip())); +//// ntcL2TPLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcL2TPLog.getD_ip())); +// return JSONObject.toJSONString(ntcL2TPLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// private static String ipsecLog(String message) { +// try { +// NTC_IPSEC_LOG ntcIpsecLog = JSONObject.parseObject(message, NTC_IPSEC_LOG.class, Feature.OrderedField); +// ntcIpsecLog.setServer_locate(ipLookup.countryLookup(ntcIpsecLog.getD_ip())); +// ntcIpsecLog.setClient_locate(ipLookup.cityLookupDetail(ntcIpsecLog.getS_ip())); +// ntcIpsecLog.setD_asn(ipLookup.asnLookup(ntcIpsecLog.getD_ip()).trim()); +// ntcIpsecLog.setS_asn(ipLookup.asnLookup(ntcIpsecLog.getS_ip()).trim()); +//// ntcIpsecLog.setS_subscribe_id(TransFormUtils.getSubscribe(ntcIpsecLog.getS_ip())); +//// ntcIpsecLog.setD_subscribe_id(TransFormUtils.getSubscribe(ntcIpsecLog.getD_ip())); +// return JSONObject.toJSONString(ntcIpsecLog); +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("Log parsing error!!! \n" + message); +// return ""; +// } +// } +// +// +//} diff --git a/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java b/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java new file mode 100644 index 0000000..23407dd --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/redis/RedisPollUtils.java @@ -0,0 +1,61 @@ +package cn.ac.iie.utils.redis; + +import org.apache.log4j.Logger; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPoolConfig; + +import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.Properties; +import java.util.Set; + +/** + * Redis连接池 + * + * @author my + * @date 2018-07-04 + */ +public final class RedisPollUtils { + private static final Logger logger = Logger.getLogger(RedisPollUtils.class); + private static JedisCluster jedisCluster; + private static Properties props = new Properties(); + + /** + * 不允许通过new创建该类的实例 + */ + private RedisPollUtils() { + } + + /** + * 初始化Redis连接池 + */ + public static JedisCluster getJedisCluster() { + try { + String redisConfigFile = "redis_aaa_config.properties"; + props.load(RedisPollUtils.class.getClassLoader().getResourceAsStream(redisConfigFile)); + } catch (IOException e) { + logger.error("Properties Initialization Failed!!!!"); + e.printStackTrace(); + } + JedisPoolConfig poolConfig = new JedisPoolConfig(); + poolConfig.setMaxTotal(Integer.valueOf(props.getProperty("jedis.pool.maxActive"))); + poolConfig.setMaxIdle(Integer.valueOf(props.getProperty("jedis.pool.maxIdle"))); + poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty("jedis.pool.maxWait"))); + poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty("jedis.pool.testOnReturn"))); + poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty("jedis.pool.testOnBorrow"))); + Set<HostAndPort> nodes = new LinkedHashSet<HostAndPort>(); + for (String port : props.getProperty("redis.port").split(",")) { + nodes.add(new HostAndPort(props.getProperty("redis.ip1"), Integer.parseInt(port))); + nodes.add(new HostAndPort(props.getProperty("redis.ip2"), Integer.parseInt(port))); + nodes.add(new HostAndPort(props.getProperty("redis.ip3"), Integer.parseInt(port))); + nodes.add(new HostAndPort(props.getProperty("redis.ip4"), Integer.parseInt(port))); + nodes.add(new HostAndPort(props.getProperty("redis.ip5"), Integer.parseInt(port))); + } + if (jedisCluster == null) { + jedisCluster = new JedisCluster(nodes, poolConfig); + } + return jedisCluster; + } + +} diff --git a/src/main/java/cn/ac/iie/utils/redis/RedisUrlPollUtils.java b/src/main/java/cn/ac/iie/utils/redis/RedisUrlPollUtils.java new file mode 100644 index 0000000..98cd1be --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/redis/RedisUrlPollUtils.java @@ -0,0 +1,61 @@ +package cn.ac.iie.utils.redis; + +import org.apache.log4j.Logger; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPoolConfig; + +import java.util.LinkedHashSet; +import java.util.Properties; +import java.util.Set; + +/** + * Redis连接池 + * + * @author my + * @date 2018-07-04 + */ +public class RedisUrlPollUtils { + private static final Logger logger = Logger.getLogger(RedisUrlPollUtils.class); + private static String redisConfigFile = "redis_url_config.properties"; + private static JedisCluster jedisCluster; + private static Properties props = new Properties(); + + /** + * 不允许通过new创建该类的实例 + */ + private RedisUrlPollUtils() { + } + + /** + * 初始化Redis连接池 + */ + public static JedisCluster getJedisCluster() { + try { + props.load(RedisUrlPollUtils.class.getClassLoader().getResourceAsStream(redisConfigFile)); + JedisPoolConfig poolConfig = new JedisPoolConfig(); + poolConfig.setMaxTotal(Integer.valueOf(props.getProperty("jedis.pool.maxActive"))); + poolConfig.setMaxIdle(Integer.valueOf(props.getProperty("jedis.pool.maxIdle"))); + poolConfig.setMaxWaitMillis(Long.valueOf(props.getProperty("jedis.pool.maxWait"))); + poolConfig.setTestOnBorrow(Boolean.valueOf(props.getProperty("jedis.pool.testOnBorrow"))); + poolConfig.setTestOnReturn(Boolean.valueOf(props.getProperty("jedis.pool.testOnReturn"))); + Set<HostAndPort> nodes = new LinkedHashSet<HostAndPort>(); + for (String port : props.getProperty("redis.port").split(",")) { + nodes.add(new HostAndPort(props.getProperty("redis.ip1"), Integer.parseInt(port))); + nodes.add(new HostAndPort(props.getProperty("redis.ip2"), Integer.parseInt(port))); + nodes.add(new HostAndPort(props.getProperty("redis.ip3"), Integer.parseInt(port))); + nodes.add(new HostAndPort(props.getProperty("redis.ip4"), Integer.parseInt(port))); + nodes.add(new HostAndPort(props.getProperty("redis.ip5"), Integer.parseInt(port))); + } + if (jedisCluster == null) { + jedisCluster = new JedisCluster(nodes, poolConfig); + } + return jedisCluster; + } catch (Exception e) { + logger.error(("JedisCluster Connection creation Failed!!!") + e); + e.printStackTrace(); + return null; + } + } + +} |
