summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--properties/file_type.properties5
-rw-r--r--src/main/java/com/zdjizhi/bean/FileMeta.java51
-rw-r--r--src/main/java/com/zdjizhi/bean/SourceList.java22
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java119
-rw-r--r--src/main/java/com/zdjizhi/utils/general/FileEdit.java47
-rw-r--r--src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java64
6 files changed, 308 insertions, 0 deletions
diff --git a/properties/file_type.properties b/properties/file_type.properties
new file mode 100644
index 0000000..8ffc908
--- /dev/null
+++ b/properties/file_type.properties
@@ -0,0 +1,5 @@
+txt
+html
+eml
+jpg
+png \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/bean/FileMeta.java b/src/main/java/com/zdjizhi/bean/FileMeta.java
new file mode 100644
index 0000000..e24e0b4
--- /dev/null
+++ b/src/main/java/com/zdjizhi/bean/FileMeta.java
@@ -0,0 +1,51 @@
+package com.zdjizhi.bean;
+
+import com.alibaba.fastjson.JSONArray;
+
+public class FileMeta {
+ private long common_log_id;
+ protected int common_recv_time;
+ private String common_schema_type;
+ private JSONArray sourceList;
+ private int processing_time;
+
+ public long getCommon_log_id() {
+ return common_log_id;
+ }
+
+ public void setCommon_log_id(long common_log_id) {
+ this.common_log_id = common_log_id;
+ }
+
+ public int getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(int common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }
+
+ public String getCommon_schema_type() {
+ return common_schema_type;
+ }
+
+ public void setCommon_schema_type(String common_schema_type) {
+ this.common_schema_type = common_schema_type;
+ }
+
+ public JSONArray getSourceList() {
+ return sourceList;
+ }
+
+ public void setSourceList(JSONArray sourceList) {
+ this.sourceList = sourceList;
+ }
+
+ public int getProcessing_time() {
+ return processing_time;
+ }
+
+ public void setProcessing_time(int processing_time) {
+ this.processing_time = processing_time;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/bean/SourceList.java b/src/main/java/com/zdjizhi/bean/SourceList.java
new file mode 100644
index 0000000..8fba85d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/bean/SourceList.java
@@ -0,0 +1,22 @@
+package com.zdjizhi.bean;
+
+public class SourceList {
+ private String destination_oss_path;
+ private String source_oss_path;
+
+ public String getDestination_oss_path() {
+ return destination_oss_path;
+ }
+
+ public void setDestination_oss_path(String destination_oss_path) {
+ this.destination_oss_path = destination_oss_path;
+ }
+
+ public String getSource_oss_path() {
+ return source_oss_path;
+ }
+
+ public void setSource_oss_path(String source_oss_path) {
+ this.source_oss_path = source_oss_path;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java b/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java
new file mode 100644
index 0000000..a90f2f2
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java
@@ -0,0 +1,119 @@
+package com.zdjizhi.utils.functions;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.bean.FileMeta;
+import com.zdjizhi.bean.SourceList;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.general.FileEdit;
+import com.zdjizhi.utils.json.JsonTypeUtil;
+
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.Map;
+
+
+/**
+ * @author wangchengcheng
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/10/14
+ */
+public class DealFileProcessFunction extends ProcessFunction<Map<String, Object>, String> {
+ private static final Log logger = LogFactory.get();
+
+ private String rpUrlValue;
+ private String rqUrlValue;
+ private String emailUrlValue;
+
+ private long cfgId = 0; //= common_policy_id;
+
+ private String sIp = null; // = common_client_ip;
+ private int sPort = 0;// = common_client_port;
+ private String dIp = null;//= common_server_ip;
+ private int dPort = 0;// = common_server_port;
+ private long foundTime = 0;// = common_recv_time;
+ private String account = null;
+ private String domain = null;
+ private String schemaType = null;
+
+
+ //初始化侧输流的标记
+ public static OutputTag<String> metaToKafa = new OutputTag<String>("metaToKafka") {
+ };
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void processElement(Map<String, Object> message, Context context, Collector<String> collector) throws Exception {
+ try {
+ if (message.size() > 0) {
+// jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
+// jsonMap = JsonTypeUtil.typeTransform(map);
+ rpUrlValue = (String) message.get("http_response_body");
+ rqUrlValue = (String) message.get("http_request_body");
+ emailUrlValue = (String) message.get("mail_eml_file");
+
+
+ if (StringUtil.isNotBlank(rpUrlValue) || StringUtil.isNotBlank(rqUrlValue) || StringUtil.isNotBlank(emailUrlValue)) {
+ cfgId = (long) message.get("common_policy_id");
+ sIp = (String) message.get("common_client_ip");
+ sPort = (int) message.get("common_client_port");
+ dIp = (String) message.get("common_server_ip");
+ dPort = (int) message.get("common_server_port");
+ foundTime = (long) message.get("common_recv_time");
+ schemaType = (String) message.get("common_schema_type");
+
+ if (StringUtil.isNotBlank((String) message.get("http_domain"))) {
+ domain = message.get("http_domain").toString();
+ } else {
+ domain = "NA";
+ }
+ if (StringUtil.isNotBlank((String) message.get("common_subscribe_id"))) {
+ account = message.get("common_subscribe_id").toString();
+ } else {
+ account = "NA";
+ }
+ FileMeta fileMeta = new FileMeta();
+ JSONArray jsonarray = new JSONArray();
+ if (StringUtil.isNotBlank(rqUrlValue)) {
+ message.put("http_request_body", FileEdit.fileDownloadUrl(rqUrlValue, "_1"));
+ SourceList request = new SourceList();
+ request.setSource_oss_path(rqUrlValue);
+ request.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, "_1"));
+ jsonarray.add(request);
+ }
+ if (StringUtil.isNotBlank(rpUrlValue)) {
+ message.put("http_response_body", FileEdit.fileDownloadUrl(rpUrlValue, "_2"));
+ SourceList response = new SourceList();
+ response.setSource_oss_path(rpUrlValue);
+ response.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, "_2"));
+ jsonarray.add(response);
+ }
+ if (StringUtil.isNotBlank(emailUrlValue)) {
+ message.put("mail_eml_file", FileEdit.fileDownloadUrl(emailUrlValue, "_9"));
+ SourceList emailFile = new SourceList();
+ emailFile.setSource_oss_path(emailUrlValue);
+ emailFile.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, "_9"));
+ jsonarray.add(emailFile);
+ }
+ fileMeta.setSourceList(jsonarray);
+ fileMeta.setCommon_log_id((long) message.get("common_log_id"));
+ fileMeta.setCommon_recv_time(Integer.parseInt(message.get("common_recv_time").toString()));
+ fileMeta.setCommon_schema_type((String) message.get("common_schema_type"));
+ fileMeta.setProcessing_time((int) (System.currentTimeMillis() / 1000));
+
+ context.output(metaToKafa, JSONObject.toJSONString(fileMeta));
+ }
+ collector.collect(JsonMapper.toJsonString(message));
+ }
+ } catch (RuntimeException e) {
+ logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message);
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/general/FileEdit.java b/src/main/java/com/zdjizhi/utils/general/FileEdit.java
new file mode 100644
index 0000000..8c3da79
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/FileEdit.java
@@ -0,0 +1,47 @@
+package com.zdjizhi.utils.general;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.ordinary.MD5Utils;
+
+import static com.zdjizhi.utils.system.FlowWriteConfigurations.judgeFileType;
+
+
+/**
+ * 文件字段操作工具
+ */
+public class FileEdit {
+
+
+ public static String fileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileSuffix) throws Exception {
+ String fileType = null;
+ if (judgeFileType(getFileType(urlValue))){
+ fileType = getFileType(urlValue);
+ }else {
+ if (schemaType.equals("HTTP")){
+ fileType = "html";
+ }
+ if (schemaType.equals("MAIL")){
+ fileType = "eml";
+ }
+ }
+ return "http://"+ FlowWriteConfig.OOS_SERVERS+"/upload_v2"+"/"+cfgId+"/"+fileType+"/"+sIp+"/"+sPort+"/"+dIp+"/"+dPort+"/"+foundTime+"/"+account+"/"+domain+"/"+getFileName(urlValue,fileSuffix);
+ }
+
+ public static String fileDownloadUrl( String urlValue,String fileSuffix) throws Exception {
+ return "http://"+ FlowWriteConfig.OOS_SERVERS+"/download_v2"+"/"+getFileName(urlValue,fileSuffix);
+ }
+
+
+ public static String getFileType(String url){
+ String[] split = url.split("\\.");
+ return split[split.length-1];
+ }
+
+ public static String getFileName(String url,String fileSuffix) throws Exception {
+ String[] arr = url.split("/");
+ String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_"));
+ String prefix = MD5Utils.md5Encode(filename);
+// String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf("."));
+ return prefix+fileSuffix;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java b/src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java
new file mode 100644
index 0000000..aa55951
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java
@@ -0,0 +1,64 @@
+package com.zdjizhi.utils.ordinary;
+
+import org.apache.log4j.Logger;
+
+import java.security.MessageDigest;
+
+/**
+ * 描述:转换MD5工具类
+ *
+ * @author Administrator
+ * @create 2018-08-13 15:11
+ */
+public class MD5Utils {
+ private static Logger logger = Logger.getLogger(MD5Utils.class);
+
+ public static String md5Encode(String msg) throws Exception {
+ try {
+ byte[] msgBytes = msg.getBytes("utf-8");
+ /*
+ * 声明使用Md5算法,获得MessaDigest对象
+ */
+ MessageDigest md5 = MessageDigest.getInstance("MD5");
+ /*
+ * 使用指定的字节更新摘要
+ */
+ md5.update(msgBytes);
+ /*
+ * 完成哈希计算,获得密文
+ */
+ byte[] digest = md5.digest();
+ /*
+ * 以上两行代码等同于
+ * byte[] digest = md5.digest(msgBytes);
+ */
+ return byteArr2hexString(digest);
+ } catch (Exception e) {
+ logger.error("Error in conversion MD5! " + msg);
+// e.printStackTrace();
+ return "";
+ }
+ }
+
+ /**
+ * 将byte数组转化为16进制字符串形式
+ *
+ * @param bys 字节数组
+ * @return 字符串
+ */
+ public static String byteArr2hexString(byte[] bys) {
+ StringBuffer hexVal = new StringBuffer();
+ int val = 0;
+ for (byte by : bys) {
+ //将byte转化为int 如果byte是一个负数就必须要和16进制的0xff做一次与运算
+ val = ((int) by) & 0xff;
+ if (val < 16) {
+ hexVal.append("0");
+ }
+ hexVal.append(Integer.toHexString(val));
+ }
+
+ return hexVal.toString();
+
+ }
+}