From 163b724a12cf556faa8a851bc7d2a00a71d4c437 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Wed, 14 Jul 2021 13:56:54 +0800 Subject: 1:新增日志字段类型弱校验功能,根据Schema定义的字段类型对原始日志不匹配的字段进行类型转换。 2:增加complete.check.type配置对此功能进行开关。 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- properties/service_flow_config.properties | 14 +- src/main/java/com/zdjizhi/bolt/CompletionBolt.java | 51 ------- .../com/zdjizhi/bolt/CompletionObjectBolt.java | 50 +++++++ .../java/com/zdjizhi/common/FlowWriteConfig.java | 2 +- .../com/zdjizhi/topology/LogFlowWriteTopology.java | 41 ++++-- .../com/zdjizhi/utils/general/TransFormObject.java | 152 +++++++++++++++++++++ .../com/zdjizhi/utils/general/TransFormUtils.java | 149 -------------------- .../com/zdjizhi/utils/general/TransFunction.java | 124 +++++++++++++---- .../java/com/zdjizhi/utils/json/JsonParseUtil.java | 82 ++++++++--- 10 files changed, 403 insertions(+), 264 deletions(-) delete mode 100644 src/main/java/com/zdjizhi/bolt/CompletionBolt.java create mode 100644 src/main/java/com/zdjizhi/bolt/CompletionObjectBolt.java create mode 100644 src/main/java/com/zdjizhi/utils/general/TransFormObject.java delete mode 100644 src/main/java/com/zdjizhi/utils/general/TransFormUtils.java diff --git a/pom.xml b/pom.xml index 8b89a8e..15bcbca 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.zdjizhi log-stream-completion-schema - v3.21.06.28-jackson + v3.21.07.13-map jar log-stream-completion-schema diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index e5cd76a..06e5a54 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -13,20 +13,22 @@ zookeeper.servers=192.168.44.12:2181 hbase.zookeeper.servers=192.168.44.12:2181 #--------------------------------HTTP/定位库------------------------------# + #定位库地址 ip.library=D:\\K18-Phase2\\tsgSpace\\dat\\ #ip.library=/home/bigdata/topology/dat/ #网关的schema位置 -schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log +#schema.http=http://192.168.40.203:9999/metadata/schema/v1/fields/connection_record_log +schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/connection_record_log #网关APP_ID 获取接口 -app.id.http=http://192.168.44.67:9999/open-api/appDicList +app.id.http=http://192.168.44.12:9999/open-api/appDicList #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -kafka.topic=CONNECTION-RECORD-LOG +kafka.topic=test #补全数据 输出 topic results.output.topic=test-result @@ -92,12 +94,12 @@ topology.tick.tuple.freq.secs=5 #spout接收睡眠时间 topology.spout.sleep.time=1 -#允许发送kafka最大失败数 -max.failure.num=20 - #邮件默认编码 mail.default.charset=UTF-8 #需不要补全,不需要则原样日志输出 log.need.complete=yes +#补全校验类型 0 强制类型校验;1 弱类型校验;2 不校验 +complete.check.type=1 + diff --git a/src/main/java/com/zdjizhi/bolt/CompletionBolt.java b/src/main/java/com/zdjizhi/bolt/CompletionBolt.java deleted file mode 100644 index d1ca4fa..0000000 --- a/src/main/java/com/zdjizhi/bolt/CompletionBolt.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.zdjizhi.bolt; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.StringUtil; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.BasicOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; - -import java.util.Map; - -import static com.zdjizhi.utils.general.TransFormUtils.dealCommonMessage; - -/** - * @author qidaijie - */ - -public class CompletionBolt extends BaseBasicBolt { - private static final long serialVersionUID = 9006119186526123734L; - private static final Log logger = LogFactory.get(); - - - @Override - public void prepare(Map stormConf, TopologyContext context) { - - } - - - @Override - public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { - try { - String message = tuple.getString(0); - if (StringUtil.isNotBlank(message)) { - basicOutputCollector.emit(new Values(dealCommonMessage(message))); - } - } catch (RuntimeException e) { - logger.error("处理原始日志下发过程异常,异常信息:" + e); - } - } - - - @Override - public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declare(new Fields("tsgLog")); - } - -} diff --git a/src/main/java/com/zdjizhi/bolt/CompletionObjectBolt.java b/src/main/java/com/zdjizhi/bolt/CompletionObjectBolt.java new file mode 100644 index 0000000..9b722a8 --- /dev/null +++ b/src/main/java/com/zdjizhi/bolt/CompletionObjectBolt.java @@ -0,0 +1,50 @@ +package com.zdjizhi.bolt; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.TransFormObject; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +/** + * @author qidaijie + */ + +public class CompletionObjectBolt extends BaseBasicBolt { + private static final long serialVersionUID = 9006119186526123734L; + private static final Log logger = LogFactory.get(); + + + @Override + public void prepare(Map stormConf, TopologyContext context) { + + } + + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + try { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + basicOutputCollector.emit(new Values(TransFormObject.dealCommonMessage(message))); + } + } catch (RuntimeException e) { + logger.error("处理原始日志下发过程异常,异常信息:" + e); + } + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("tsgLog")); + } + +} diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index fc2e116..ffb2795 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -30,9 +30,9 @@ public class FlowWriteConfig { public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time"); public static final Integer BATCH_INSERT_NUM = FlowWriteConfigurations.getIntProperty(0, "batch.insert.num"); public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); - public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num"); public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); public static final String LOG_NEED_COMPLETE = FlowWriteConfigurations.getStringProperty(0, "log.need.complete"); + public static final Integer COMPLETE_CHECK_TYPE = FlowWriteConfigurations.getIntProperty(0, "complete.check.type"); /** * kafka diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 036f922..be2616b 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -1,7 +1,9 @@ package com.zdjizhi.topology; -import com.zdjizhi.bolt.CompletionBolt; +import com.zdjizhi.bolt.CompletionMapBolt; +import com.zdjizhi.bolt.CompletionTypeMapBolt; +import com.zdjizhi.bolt.CompletionObjectBolt; import com.zdjizhi.bolt.LogSendBolt; import com.zdjizhi.common.DefaultProConfig; import com.zdjizhi.common.FlowWriteConfig; @@ -51,9 +53,9 @@ public class LogFlowWriteTopology { private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); - topologyConfig.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,DefaultProConfig.TRANSFER_BUFFER_SIZE); - topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,DefaultProConfig.EXECUTOR_RECEIVE_BUFFER_SIZE); - topologyConfig.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,DefaultProConfig.EXECUTOR_SEND_BUFFER_SIZE); + topologyConfig.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, DefaultProConfig.TRANSFER_BUFFER_SIZE); + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, DefaultProConfig.EXECUTOR_RECEIVE_BUFFER_SIZE); + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, DefaultProConfig.EXECUTOR_SEND_BUFFER_SIZE); StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); } @@ -62,12 +64,33 @@ public class LogFlowWriteTopology { builder = new TopologyBuilder(); builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM); if (need.equals(FlowWriteConfig.LOG_NEED_COMPLETE)) { - builder.setBolt("LogCompletionBolt", new CompletionBolt(), - FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); - builder.setBolt("LogSendBolt", new LogSendBolt(), - FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogCompletionBolt"); + switch (FlowWriteConfig.COMPLETE_CHECK_TYPE) { + case 0: + builder.setBolt("LogObjectCompletionBolt", new CompletionObjectBolt(), + FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setBolt("SendKafkaBolt", new LogSendBolt(), + FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogObjectCompletionBolt"); + break; + case 1: + builder.setBolt("LogTypeMapCompletionBolt", new CompletionTypeMapBolt(), + FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setBolt("SendKafkaBolt", new LogSendBolt(), + FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogTypeMapCompletionBolt"); + break; + case 2: + builder.setBolt("LogMapCompletionBolt", new CompletionMapBolt(), + FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setBolt("SendKafkaBolt", new LogSendBolt(), + FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogMapCompletionBolt"); + break; + default: + builder.setBolt("LogCompletionBolt", new CompletionObjectBolt(), + FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); + builder.setBolt("SendKafkaBolt", new LogSendBolt(), + FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogCompletionBolt"); + } } else { - builder.setBolt("LogSendBolt", new LogSendBolt(), + builder.setBolt("SendKafkaBolt", new LogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout"); } } diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java new file mode 100644 index 0000000..3421b68 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java @@ -0,0 +1,152 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.json.JsonParseUtil; + +import java.util.ArrayList; +import java.util.HashMap; + + +/** + * 描述:转换或补全工具类 + * + * @author qidaijie + */ +public class TransFormObject { + private static final Log logger = LogFactory.get(); + + /** + * 在内存中加载反射类用的map + */ + private static HashMap map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 反射成一个类 + */ + private static Object mapObject = JsonParseUtil.generateObject(map); + + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 解析日志,并补全 + * + * @param message kafka Topic原始日志 + * @return 补全后的日志 + */ + public static String dealCommonMessage(String message) { + try { +// Object object = JsonMapper.fromJsonString(message, mapObject.getClass()); + Object object = JSONObject.parseObject(message, mapObject.getClass()); + for (String[] strings : jobList) { + //用到的参数的值 + Object name = JsonParseUtil.getValue(object, strings[0]); + //需要补全的字段的key + String appendToKeyName = strings[1]; + //需要补全的字段的值 + Object appendTo = JsonParseUtil.getValue(object, appendToKeyName); + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + functionSet(function, object, appendToKeyName, appendTo, name, param); + } +// return JsonMapper.toJsonString(object); + return JSONObject.toJSONString(object); + } catch (RuntimeException e) { + logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message); + return ""; + } + } + + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param function 匹配操作函数的字段 + * @param object 动态POJO Object + * @param appendToKeyName 需要补全的字段的key + * @param appendTo 需要补全的字段的值 + * @param name 用到的参数的值 + * @param param 额外的参数的值 + */ + private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) { + switch (function) { + case "current_timestamp": + if (!(appendTo instanceof Long)) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId()); + break; + case "geo_ip_detail": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(name.toString())); + } + break; + case "geo_asn": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(name.toString())); + } + break; + case "geo_ip_country": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(name.toString())); + } + break; + case "set_value": + if (name != null && param != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.setValue(param)); + } + break; + case "get_value": + if (name != null) { + JsonParseUtil.setValue(object, appendToKeyName, name); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.condition(object, param)); + } + break; + case "sub_domain": + if (appendTo == null && name != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getTopDomain(name.toString())); + } + break; + case "radius_match": + if (name != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString())); + } + break; + case "app_match": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString())); + } + break; + case "decode_of_base64": + if (name != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param))); + } + break; + case "flattenSpec": + if (name != null && param != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), param)); + } + break; + default: + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java b/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java deleted file mode 100644 index 5bfda89..0000000 --- a/src/main/java/com/zdjizhi/utils/general/TransFormUtils.java +++ /dev/null @@ -1,149 +0,0 @@ -package com.zdjizhi.utils.general; - - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.json.JsonParseUtil; - -import java.util.ArrayList; -import java.util.HashMap; - - -/** - * 描述:转换或补全工具类 - * - * @author qidaijie - */ -public class TransFormUtils { - private static final Log logger = LogFactory.get(); - - /** - * 在内存中加载反射类用的map - */ - private static HashMap map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); - - /** - * 反射成一个类 - */ - private static Object mapObject = JsonParseUtil.generateObject(map); - - /** - * 获取任务列表 - * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: - * (mail_subject mail_subject decode_of_base64 mail_subject_charset) - */ - private static ArrayList jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); - - /** - * 解析日志,并补全 - * - * @param message kafka Topic原始日志 - * @return 补全后的日志 - */ - public static String dealCommonMessage(String message) { - try { - Object object = JsonMapper.fromJsonString(message, mapObject.getClass()); - for (String[] strings : jobList) { - //用到的参数的值 - Object name = JsonParseUtil.getValue(object, strings[0]); - //需要补全的字段的key - String appendToKeyName = strings[1]; - //需要补全的字段的值 - Object appendTo = JsonParseUtil.getValue(object, appendToKeyName); - //匹配操作函数的字段 - String function = strings[2]; - //额外的参数的值 - String param = strings[3]; - functionSet(function, object, appendToKeyName, appendTo, name, param); - } - return JsonMapper.toJsonString(object); - } catch (RuntimeException e) { - logger.error("解析补全日志信息过程异常,异常信息:" + e); - e.printStackTrace(); - return ""; - } - } - - - /** - * 根据schema描述对应字段进行操作的 函数集合 - * - * @param function 匹配操作函数的字段 - * @param object 动态POJO Object - * @param appendToKeyName 需要补全的字段的key - * @param appendTo 需要补全的字段的值 - * @param name 用到的参数的值 - * @param param 额外的参数的值 - */ - private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) { - switch (function) { - case "current_timestamp": - if (! (appendTo instanceof Long)) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime()); - } - break; - case "snowflake_id": - JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId()); - break; - case "geo_ip_detail": - if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(name.toString())); - } - break; - case "geo_asn": - if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(name.toString())); - } - break; - case "geo_ip_country": - if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(name.toString())); - } - break; - case "set_value": - if (name != null && param != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.setValue(param)); - } - break; - case "get_value": - if (name != null) { - JsonParseUtil.setValue(object, appendToKeyName, name); - } - break; - case "if": - if (param != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.condition(object, param)); - } - break; - case "sub_domain": - if (appendTo == null && name != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getTopDomain(name.toString())); - } - break; - case "radius_match": - if (name != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString())); - } - break; - case "app_match": - if (name != null && appendTo == null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString())); - } - break; - case "decode_of_base64": - if (name != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param))); - } - break; - case "flattenSpec": - if (name != null && param != null) { - JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), TransFunction.isJsonValue(object, param))); - } - break; - default: - } - } - -} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java index 0e9fb93..7cc0e8f 100644 --- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -15,8 +15,10 @@ import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.app.AppUtils; import com.zdjizhi.utils.hbase.HBaseUtils; import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.json.TypeUtils; import java.util.ArrayList; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -136,14 +138,14 @@ class TransFunction { * @param charset 编码 * @return 解码字符串 */ - static String decodeBase64(String message, String charset) { + static String decodeBase64(String message, Object charset) { String result = ""; try { if (StringUtil.isNotBlank(message)) { - if (StringUtil.isNotBlank(charset)) { - result = Base64.decodeStr(message, charset); - } else { + if (charset == null) { result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET); + } else { + result = Base64.decodeStr(message, charset.toString()); } } } catch (RuntimeException rune) { @@ -180,14 +182,24 @@ class TransFunction { * @param param 字段名/普通字符串 * @return JSON.Value or String */ - static String isJsonValue(Object object, String param) { + static Object isJsonValue(Object object, String param) { if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) { - Object value = JsonParseUtil.getValue(object, param.substring(2)); - if (value != null) { - return value.toString(); - } else { - return ""; - } + return JsonParseUtil.getValue(object, param.substring(2)); + } else { + return param; + } + } + + /** + * 判断是否为日志字段,是则返回对应value,否则返回原始字符串 + * + * @param jsonMap 内存实体类 + * @param param 字段名/普通字符串 + * @return JSON.Value or String + */ + static Object isJsonValue(Map jsonMap, String param) { + if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) { + return JsonParseUtil.getValue(jsonMap, param.substring(2)); } else { return param; } @@ -198,32 +210,92 @@ class TransFunction { * * @param object 内存实体类 * @param ifParam 字段名/普通字符串 - * @return resultA or resultB or "" + * @return resultA or resultB or null */ static Object condition(Object object, String ifParam) { + Object result = null; try { String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); - String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); - String direction = isJsonValue(object, norms[0]); - if (StringUtil.isNotBlank(direction)) { - if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { - String resultA = isJsonValue(object, split[1]); - String resultB = isJsonValue(object, split[2]); - String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB; - Matcher isNum = PATTERN.matcher(result); - if (isNum.matches()) { - return Long.parseLong(result); - } else { - return result; - } + if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { + String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); + Object direction = isJsonValue(object, norms[0]); + Object resultA = isJsonValue(object, split[1]); + Object resultB = isJsonValue(object, split[2]); + if (direction instanceof Number) { +// result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB; + result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB); + } else if (direction instanceof String) { + result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB); +// result = direction.equals(norms[1]) ? resultA : resultB; } } } catch (RuntimeException e) { logger.error("IF 函数执行异常,异常信息:" + e); } - return null; + return result; } + /** + * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 + * + * @param jsonMap 内存实体类 + * @param ifParam 字段名/普通字符串 + * @return resultA or resultB or null + */ + static Object condition(Map jsonMap, String ifParam) { + Object result = null; + try { + String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); + if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { + String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); + Object direction = isJsonValue(jsonMap, norms[0]); + Object resultA = isJsonValue(jsonMap, split[1]); + Object resultB = isJsonValue(jsonMap, split[2]); + if (direction instanceof Number) { + result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB; +// result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB); + } else if (direction instanceof String) { +// result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB); + result = direction.equals(norms[1]) ? resultA : resultB; + } + } + } catch (RuntimeException e) { + logger.error("IF 函数执行异常,异常信息:" + e); + } + return result; + } + +// /** +// * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 +// * +// * @param jsonMap 原始日志 +// * @param ifParam 字段名/普通字符串 +// * @return resultA or resultB or null +// */ +// static Object condition(Map jsonMap, String ifParam) { +// try { +// String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); +// String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); +// String direction = isJsonValue(jsonMap, norms[0]); +// if (StringUtil.isNotBlank(direction)) { +// if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { +// String resultA = isJsonValue(jsonMap, split[1]); +// String resultB = isJsonValue(jsonMap, split[2]); +// String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB; +// Matcher isNum = PATTERN.matcher(result); +// if (isNum.matches()) { +// return Long.parseLong(result); +// } else { +// return result; +// } +// } +// } +// } catch (RuntimeException e) { +// logger.error("IF 函数执行异常,异常信息:" + e); +// } +// return null; +// } + /** * 设置固定值函数 若为数字则转为long返回 * diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index 07ee2e5..7316b92 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -21,9 +21,10 @@ import java.util.*; * @author qidaijie */ public class JsonParseUtil { + private static final Log logger = LogFactory.get(); - private static List dropFieldList = new ArrayList<>(); + private static ArrayList dropList = new ArrayList<>(); /** * 模式匹配,给定一个类型字符串返回一个类类型 @@ -39,17 +40,14 @@ public class JsonParseUtil { case "int": clazz = Integer.class; break; - case "String": + case "string": clazz = String.class; break; case "long": clazz = long.class; break; case "array": - clazz = JSONArray.class; - break; - case "Integer": - clazz = Integer.class; + clazz = List.class; break; case "double": clazz = double.class; @@ -76,31 +74,51 @@ public class JsonParseUtil { } /** - * 根据反射生成对象的方法 + * 获取属性值的方法 * - * @param properties 反射类用的map - * @return 生成的Object类型的对象 + * @param obj 对象 + * @param property key + * @return 属性的值 */ - public static Object generateObject(Map properties) { - BeanGenerator generator = new BeanGenerator(); - Set keySet = properties.keySet(); - for (Object aKeySet : keySet) { - String key = (String) aKeySet; - generator.addProperty(key, (Class) properties.get(key)); + public static Object getValue(Object obj, String property) { + try { + BeanMap beanMap = BeanMap.create(obj); + return beanMap.get(property); + } catch (RuntimeException e) { + logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + return null; } - return generator.create(); } /** * 获取属性值的方法 * - * @param obj 对象 + * @param jsonMap 原始日志 * @param property key * @return 属性的值 */ - public static Object getValue(Object obj, String property) { - BeanMap beanMap = BeanMap.create(obj); - return beanMap.get(property); + public static Object getValue(Map jsonMap, String property) { + try { + return jsonMap.getOrDefault(property, null); + } catch (RuntimeException e) { + logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + return null; + } + } + + /** + * 更新属性值的方法 + * + * @param jsonMap 原始日志json map + * @param property 更新的key + * @param value 更新的值 + */ + public static void setValue(Map jsonMap, String property, Object value) { + try { + jsonMap.put(property, value); + } catch (RuntimeException e) { + logger.error("赋予实体类错误类型数据", e); + } } /** @@ -119,6 +137,22 @@ public class JsonParseUtil { } } + /** + * 根据反射生成对象的方法 + * + * @param properties 反射类用的map + * @return 生成的Object类型的对象 + */ + public static Object generateObject(Map properties) { + BeanGenerator generator = new BeanGenerator(); + Set keySet = properties.keySet(); + for (Object aKeySet : keySet) { + String key = (String) aKeySet; + generator.addProperty(key, (Class) properties.get(key)); + } + return generator.create(); + } + /** * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 * @@ -146,7 +180,7 @@ public class JsonParseUtil { //组合用来生成实体类的map map.put(name, getClassName(type)); } else { - dropFieldList.add(JsonPath.read(filedStr, "$.name").toString()); + dropList.add(filedStr); } } return map; @@ -173,6 +207,12 @@ public class JsonParseUtil { return isKeepField; } + static void dropJsonField(Map jsonMap) { + for (String field : dropList) { + jsonMap.remove(field); + } + } + /** * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) * -- cgit v1.2.3