diff options
| author | qidaijie <[email protected]> | 2023-04-14 19:00:46 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2023-04-14 19:00:46 +0800 |
| commit | 3dce56828ca0875035ce1ddbbc2cec427840d9df (patch) | |
| tree | 4b64e1ac6b4e1c1aaeb1c1fc9cf6b22bd3880390 | |
| parent | 338c03f6eb9dd75dd4fe9e46d14720d27a1be29d (diff) | |
Live Traffic Chart使用Fastjson2测试版本FastJson2-test
23 files changed, 861 insertions, 691 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>log-olap-analysis-schema</artifactId> - <version>220822-VSYS</version> + <version>230414-FastJson2</version> <name>log-olap-analysis-schema</name> <url>http://www.example.com</url> @@ -16,7 +16,7 @@ <repository> <id>nexus</id> <name>Team Nexus Repository</name> - <url>http://192.168.40.125:8099/content/groups/public</url> + <url>http://192.168.40.153:8099/content/groups/public</url> </repository> <repository> @@ -40,6 +40,7 @@ <hbase.version>2.2.3</hbase.version> <nacos.version>1.2.0</nacos.version> <zdjz.tools.version>1.0.8</zdjz.tools.version> + <fastjson.version>2.0.26</fastjson.version> <scope.type>provided</scope.type> <!--<scope.type>compile</scope.type>--> </properties> @@ -188,7 +189,7 @@ <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> - <version>2.4.0</version> + <version>2.7.0</version> </dependency> <dependency> @@ -209,6 +210,20 @@ <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>${nacos.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> </dependency> <dependency> @@ -218,6 +233,18 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.datasketches</groupId> + <artifactId>datasketches-java</artifactId> + <version>3.2.0</version> + </dependency> + + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>${fastjson.version}</version> + </dependency> + </dependencies> </project> diff --git a/properties/default_config.properties b/properties/default_config.properties index c0f8aef..73cffc5 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -3,7 +3,7 @@ session.timeout.ms=60000 #kafka source poll -max.poll.records=3000 +max.poll.records=5000 #kafka source poll bytes max.partition.fetch.bytes=31457280 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 97438d9..c5794bd 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -15,41 +15,42 @@ tools.library=D:\\workerspace\\dat nacos.server=192.168.44.12:8848 #nacos namespace -nacos.schema.namespace=prod +nacos.schema.namespace=livecharts #nacos data id -nacos.data.id=liveChart_session.json +nacos.data.id=liveChart_session_test.json #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -source.kafka.topic=SESSION-RECORD +source.kafka.topic=test #补全数据 输出 topic -sink.kafka.topic=test-result +sink.kafka.topic=TRAFFIC-PROTOCOL-TEST #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=livecharts-test-20220816-1 +group.id=livecharts-test-20230327-3 #--------------------------------topology配置------------------------------# #consumer 并行度 -source.parallelism=1 +source.parallelism=3 #map函数并行度 -parse.parallelism=1 +parse.parallelism=3 #第一次窗口计算并行度 -first.window.parallelism=1 +first.window.parallelism=3 #第二次窗口计算并行度 -second.window.parallelism=1 +second.window.parallelism=3 #producer 并行度 -sink.parallelism=1 +sink.parallelism=3 #初次随机预聚合窗口时间 first.count.window.time=5 #二次聚合窗口时间 second.count.window.time=15 + diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java index 0243f97..9ea9df5 100644 --- a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java +++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java @@ -15,8 +15,24 @@ public class StreamAggregateConfig { encryptor.setPassword("galaxy"); } + /** + * 默认的切分符号 + */ public static final String FORMAT_SPLITTER = ","; + /** + * 协议分隔符,需要转义 + */ public static final String PROTOCOL_SPLITTER = "\\."; + /** + * 标识字段为日志字段还是schema指定字段 + */ + public static final String IS_JSON_KEY_TAG = "$."; + + /** + * if函数连接分隔符 + */ + public static final String IF_CONDITION_SPLITTER = "="; + /** * Nacos @@ -27,7 +43,7 @@ public class StreamAggregateConfig { public static final String NACOS_PIN = StreamAggregateConfigurations.getStringProperty(1, "nacos.pin"); public static final String NACOS_GROUP = StreamAggregateConfigurations.getStringProperty(1, "nacos.group"); public static final String NACOS_USERNAME = StreamAggregateConfigurations.getStringProperty(1, "nacos.username"); - + /** * System */ diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java index c2d4f31..570ea93 100644 --- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -2,11 +2,12 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.StreamAggregateConfig; -import com.zdjizhi.utils.functions.*; import com.zdjizhi.utils.functions.keyby.FirstKeyByFunction; import com.zdjizhi.utils.functions.keyby.SecondKeyByFunction; import com.zdjizhi.utils.functions.parse.ParseMapFunction; +import com.zdjizhi.utils.functions.result.ResultFlatMapFunction; import com.zdjizhi.utils.functions.statistics.FirstCountWindowFunction; import com.zdjizhi.utils.functions.statistics.SecondCountWindowFunction; import com.zdjizhi.utils.kafka.KafkaConsumer; @@ -21,6 +22,8 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTime import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import java.util.Map; + /** * @author qidaijie @@ -38,29 +41,37 @@ public class StreamAggregateTopology { //两个输出之间的最大时间 (单位milliseconds) environment.setBufferTimeout(StreamAggregateConfig.BUFFER_TIMEOUT); + //解析原始日志 DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()) .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM).name(StreamAggregateConfig.SOURCE_KAFKA_TOPIC); - SingleOutputStreamOperator<Tuple3<String, String, String>> parseDataMap = streamSource.map(new ParseMapFunction()) + //解析原始日志初步聚合计算,增加自定义key 缓解数据倾斜 + SingleOutputStreamOperator<Tuple3<String, String, JSONObject>> parseDataMap = streamSource.map(new ParseMapFunction()) .name("ParseDataMap") .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); - WindowedStream<Tuple3<String, String, String>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction()) + //初步聚合计算,增加自定义key 缓解数据倾斜 + WindowedStream<Tuple3<String, String, JSONObject>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction()) .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.FIRST_COUNT_WINDOW_TIME))); - SingleOutputStreamOperator<Tuple2<String, String>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction()) + //初次聚合计算窗口 + SingleOutputStreamOperator<Tuple2<String, JSONObject>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction()) .name("FirstCountWindow") .setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM); - WindowedStream<Tuple2<String, String>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction()) + //二次聚合计算,使用业务的key 进行数据汇总 + WindowedStream<Tuple2<String, JSONObject>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction()) .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.SECOND_COUNT_WINDOW_TIME))); - SingleOutputStreamOperator<String> secondCountWindow = secondWindow.process(new SecondCountWindowFunction()) + //二次聚合计算窗口 + SingleOutputStreamOperator<JSONObject> secondCountWindow = secondWindow.process(new SecondCountWindowFunction()) .name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM); + //拆解结果数据按protocol id循环输出 SingleOutputStreamOperator<String> resultFlatMap = secondCountWindow.flatMap(new ResultFlatMapFunction()) .name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM); + //输出到kafka resultFlatMap.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka") .setParallelism(StreamAggregateConfig.SINK_PARALLELISM).name(StreamAggregateConfig.SINK_KAFKA_TOPIC); diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java deleted file mode 100644 index 57e8a2c..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.zdjizhi.utils.functions; - -import com.zdjizhi.common.StreamAggregateConfig; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.json.JsonParseUtil; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.util.Collector; - -import java.util.Arrays; -import java.util.Map; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/7/2114:52 - */ -public class ResultFlatMapFunction implements FlatMapFunction<String, String> { - private static String[] jobList = JsonParseUtil.getHierarchy(); - private static final String APP_NAME = "app_name"; - - @Override - @SuppressWarnings("unchecked") - public void flatMap(String value, Collector out) throws Exception { - StringBuffer stringBuffer = new StringBuffer(); - String name = jobList[0]; - Map<String, Object> jsonObject = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class); - String protocol = JsonParseUtil.getString(jsonObject, name); - String appName = JsonParseUtil.getString(jsonObject, APP_NAME); - jsonObject.remove(APP_NAME); - if (StringUtil.isNotBlank(protocol)) { - String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER); - for (String proto : protocolIds) { - if (StringUtil.isBlank(stringBuffer.toString())) { - stringBuffer.append(proto); - jsonObject.put(name, stringBuffer.toString()); - out.collect(JsonMapper.toJsonString(jsonObject)); - } else { - stringBuffer.append(jobList[1]).append(proto); - if (proto.equals(appName)) { - jsonObject.put(APP_NAME, appName); - } - jsonObject.put(name, stringBuffer.toString()); - out.collect(JsonMapper.toJsonString(jsonObject)); - } - } - } - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/filter/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/filter/FilterNullFunction.java deleted file mode 100644 index 6c83b38..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/filter/FilterNullFunction.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.zdjizhi.utils.functions.filter; - -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.common.functions.FilterFunction; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class FilterNullFunction implements FilterFunction<String> { - @Override - public boolean filter(String message) { - return StringUtil.isNotBlank(message); - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java index 7783676..eed7c44 100644 --- a/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java @@ -1,10 +1,10 @@ package com.zdjizhi.utils.functions.keyby; -import cn.hutool.core.util.RandomUtil; -import com.zdjizhi.common.StreamAggregateConfig; +import com.alibaba.fastjson2.JSONObject; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; + +import java.util.Map; /** * @author qidaijie @@ -12,10 +12,10 @@ import org.apache.flink.api.java.tuple.Tuple4; * @Description: * @date 2021/7/2112:13 */ -public class FirstKeyByFunction implements KeySelector<Tuple3< String, String, String>, String> { +public class FirstKeyByFunction implements KeySelector<Tuple3< String, String, JSONObject>, String> { @Override - public String getKey(Tuple3<String, String, String> value) throws Exception { + public String getKey(Tuple3<String, String, JSONObject> value) throws Exception { //以map拼接的key分组 return value.f0; } diff --git a/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java index fd81d6e..7fd1b27 100644 --- a/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java @@ -1,10 +1,10 @@ package com.zdjizhi.utils.functions.keyby; -import cn.hutool.core.util.RandomUtil; -import com.zdjizhi.common.StreamAggregateConfig; + +import com.alibaba.fastjson2.JSONObject; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple4; + /** * @author qidaijie @@ -12,10 +12,10 @@ import org.apache.flink.api.java.tuple.Tuple4; * @Description: * @date 2021/7/2112:13 */ -public class SecondKeyByFunction implements KeySelector<Tuple2<String,String>, String> { +public class SecondKeyByFunction implements KeySelector<Tuple2<String, JSONObject>, String> { @Override - public String getKey(Tuple2<String, String> value) throws Exception { + public String getKey(Tuple2<String, JSONObject> value) throws Exception { //以map拼接的key分组 return value.f0; } diff --git a/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java index 4d69c82..a58b637 100644 --- a/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java @@ -2,16 +2,15 @@ package com.zdjizhi.utils.functions.parse; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.StreamAggregateConfig; -import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.general.ParseFunctions; -import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.meta.MetaDataParse; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -22,20 +21,19 @@ import java.util.concurrent.ThreadLocalRandom; * @Description: * @date 2021/5/2715:01 */ -public class ParseMapFunction implements MapFunction<String, Tuple3<String, String, String>> { +public class ParseMapFunction implements MapFunction<String, Tuple3<String, String, JSONObject>> { private static final Log logger = LogFactory.get(); @Override @SuppressWarnings("unchecked") - public Tuple3<String, String, String> map(String message) { + public Tuple3<String, String, JSONObject> map(String message) { try { - ArrayList<String[]> jobList = JsonParseUtil.getTransformsList(); - HashMap<String, String> dimensionsMap = JsonParseUtil.getDimensionsMap(); if (StringUtil.isNotBlank(message)) { - Map<String, Object> originalLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); - Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, originalLog); + JSONObject originalLog = JSON.parseObject(message); + Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(MetaDataParse.getDimensionsMap(), originalLog); if (ParseFunctions.filterLogs(originalLog)) { - for (String[] strings : jobList) { + JSONObject metricsLog = ParseFunctions.getMetricsLog(originalLog); + for (String[] strings : MetaDataParse.getTransformsList()) { //函数名称 String function = strings[0]; //结果集字段key @@ -44,9 +42,8 @@ public class ParseMapFunction implements MapFunction<String, Tuple3<String, Stri String logsKeyName = strings[2]; //额外的参数的值 String parameters = strings[3]; - //原始日志字段对应的值 - Object logsValue = JsonParseUtil.getValue(originalLog, logsKeyName); + Object logsValue = originalLog.get(logsKeyName); switch (function) { case "combination": @@ -64,8 +61,8 @@ public class ParseMapFunction implements MapFunction<String, Tuple3<String, Stri } break; case "hierarchy": - String key = JsonParseUtil.getString(dimensionsObj, resultKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM); - return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(originalLog)); + String key = dimensionsObj.get(resultKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM); + return new Tuple3<>(key, JSONObject.toJSONString(dimensionsObj), metricsLog); default: break; } @@ -74,9 +71,9 @@ public class ParseMapFunction implements MapFunction<String, Tuple3<String, Stri } } catch (RuntimeException e) { logger.error("An error occurred in the original log parsing reorganization,error message is:" + e); - return new Tuple3<>("", "", ""); + return new Tuple3<>("", "", null); } - return new Tuple3<>("", "", ""); + return new Tuple3<>("", "", null); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/result/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/result/ResultFlatMapFunction.java new file mode 100644 index 0000000..ec98c28 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/result/ResultFlatMapFunction.java @@ -0,0 +1,51 @@ +package com.zdjizhi.utils.functions.result; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSONObject; +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.ParseFunctions; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2114:52 + */ +public class ResultFlatMapFunction implements FlatMapFunction<JSONObject, String> { + private static final Log logger = LogFactory.get(); + + private static final String PROTOCOL_ID_KEY = "protocol_stack_id"; + private static final String APP_NAME_KEY = "app_name"; + private static final String HLL_SKETCH_KEY = "client_ip_sketch"; + + + @Override + @SuppressWarnings("unchecked") + public void flatMap(JSONObject jsonObject, Collector<String> out) throws Exception { + String protocol = jsonObject.getString(PROTOCOL_ID_KEY); + if (jsonObject.containsKey(HLL_SKETCH_KEY)) { + jsonObject.put(HLL_SKETCH_KEY, ParseFunctions.getHllSketch(jsonObject, HLL_SKETCH_KEY)); + } + out.collect(jsonObject.toString()); + jsonObject.remove(APP_NAME_KEY); + + StringBuilder stringBuilder = new StringBuilder(); + String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER); + int protocolIdsNum = protocolIds.length; + for (int i = 0; i < protocolIdsNum - 1; i++) { + if (StringUtil.isBlank(stringBuilder.toString())) { + stringBuilder.append(protocolIds[i]); + jsonObject.put(PROTOCOL_ID_KEY, stringBuilder.toString()); + out.collect(jsonObject.toString()); + } else { + stringBuilder.append(".").append(protocolIds[i]); + jsonObject.put(PROTOCOL_ID_KEY, stringBuilder.toString()); + out.collect(jsonObject.toString()); + } + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java index 93844be..b42f107 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java @@ -1,20 +1,19 @@ package com.zdjizhi.utils.functions.statistics; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.utils.general.MetricFunctions; -import com.zdjizhi.utils.general.ParseFunctions; -import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.meta.MetaDataParse; +import org.apache.datasketches.hll.HllSketch; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; -import java.util.Map; /** * @author qidaijie @@ -22,44 +21,34 @@ import java.util.Map; * @Description: * @date 2021/7/2113:55 */ -public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, Tuple2<String, String>, String, TimeWindow> { - private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); +public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> { + private static final Log logger = LogFactory.get(); - private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(16); + private HashMap<String, JSONObject> cacheMap = new HashMap<>(32); @Override @SuppressWarnings("unchecked") - public void process(String key, Context context, Iterable<Tuple3<String, String, String>> input, Collector<Tuple2<String, String>> output) { + public void process(String key, Context context, Iterable<Tuple3<String, String, JSONObject>> input, Collector<Tuple2<String, JSONObject>> output) { try { - HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricFunctionsMap(); - HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap(); - for (Tuple3<String, String, String> tuple : input) { - String groupKey = tuple.f0; - String protocol = groupKey.substring(0, groupKey.indexOf("@")); + HashMap<String, String[]> metricsMap = MetaDataParse.getMetricFunctionsMap(); + for (Tuple3<String, String, JSONObject> tuple : input) { String dimensions = tuple.f1; - String metrics = tuple.f2; - //action中某个协议的所有action,如果没有就默认 - String[] protocolMetrics = actionMap.getOrDefault(protocol, actionMap.get("Default")); - if (StringUtil.isNotBlank(metrics)) { - Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class); - Map<String, Object> metricsObj = (Map<String, Object>) JsonMapper.fromJsonString(metrics, Map.class); + JSONObject metrics = tuple.f2; + JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, JSON.parseObject(dimensions)); - Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj); - for (String protocolMetric : protocolMetrics) { - String[] functions = metricsMap.get(protocolMetric); - String function = functions[0]; - String fieldName = functions[1]; - functionSet(function, cacheMessage, protocolMetric, cacheMessage.get(protocolMetric), JsonParseUtil.getValue(metricsObj, fieldName)); - - } - cacheMap.put(dimensions, cacheMessage); + for (String resultKeyName : metricsMap.keySet()) { + String[] functions = metricsMap.get(resultKeyName); + String function = functions[0]; + String fieldName = functions[1]; + functionSet(function, cacheMessage, resultKeyName, metrics.get(fieldName)); } + cacheMap.put(dimensions, cacheMessage); } + if (!cacheMap.isEmpty()) { for (String dimensions : cacheMap.keySet()) { - Map<String, Object> resultMap = cacheMap.get(dimensions); - output.collect(new Tuple2<>(dimensions, JsonMapper.toJsonString(resultMap))); + output.collect(new Tuple2<>(dimensions, cacheMap.get(dimensions))); } } @@ -71,27 +60,25 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<Strin } } + /** * 根据schema描述对应字段进行操作的 函数集合 * * @param function 函数名称 * @param cacheMessage 结果集 - * @param nameValue 当前值 + * @param resultKeyName 结果字段名称 * @param fieldNameValue 新加值 */ - private static void functionSet(String function, Map<String, Object> cacheMessage, String resultName, Object nameValue, Object fieldNameValue) { + private static void functionSet(String function, JSONObject cacheMessage, String resultKeyName, Object fieldNameValue) { switch (function) { case "sum": - cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue)); + cacheMessage.put(resultKeyName, MetricFunctions.longSum(cacheMessage.get(resultKeyName), fieldNameValue)); break; case "count": - cacheMessage.put(resultName, MetricFunctions.count(nameValue)); - break; - case "unique_sip_num": - //TODO + cacheMessage.put(resultKeyName, MetricFunctions.count(cacheMessage.get(resultKeyName))); break; - case "unique_cip_num": - //TODO + case "HLLSketchBuild": + cacheMessage.put(resultKeyName, MetricFunctions.uniqueHllSketch((HllSketch) cacheMessage.get(resultKeyName), fieldNameValue.toString())); break; default: break; diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java index c0b2091..e46da07 100644 --- a/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java @@ -1,15 +1,17 @@ package com.zdjizhi.utils.functions.statistics; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.StringUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.utils.general.MetricFunctions; -import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.general.ParseFunctions; +import com.zdjizhi.utils.meta.MetaDataParse; +import org.apache.datasketches.hll.HllSketch; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -20,28 +22,26 @@ import java.util.Map; * @Description: * @date 2021/7/2113:55 */ -public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<String, String>, String, String, TimeWindow> { - private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class); +public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<String, JSONObject>, JSONObject, String, TimeWindow> { + private static final Log logger = LogFactory.get(); - private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(16); + private HashMap<String, JSONObject> cacheMap = new HashMap<>(32); @Override @SuppressWarnings("unchecked") - public void process(String key, Context context, Iterable<Tuple2<String, String>> input, Collector<String> output) { + public void process(String key, Context context, Iterable<Tuple2<String, JSONObject>> input, Collector<JSONObject> output) { try { - HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricFunctionsMap(); - for (Tuple2<String, String> tuple : input) { + HashMap<String, String[]> metricsMap = MetaDataParse.getMetricFunctionsMap(); + for (Tuple2<String, JSONObject> tuple : input) { String dimensions = tuple.f0; - String message = tuple.f1; - if (StringUtil.isNotBlank(message)) { - Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class); - Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); + Map<String, Object> message = tuple.f1; + if (message.size() != 0) { + JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, JSON.parseObject(dimensions)); - Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj); - for (String name : metricsMap.keySet()) { - String[] metrics = metricsMap.get(name); + for (String resultName : metricsMap.keySet()) { + String[] metrics = metricsMap.get(resultName); String function = metrics[0]; - functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, name)); + functionSet(function, cacheMessage, resultName, message.get(resultName)); } cacheMap.put(dimensions, cacheMessage); @@ -52,9 +52,9 @@ public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<Stri Long endTime = context.window().getEnd() / 1000; for (String countKey : cacheMap.keySet()) { - Map<String, Object> resultMap = cacheMap.get(countKey); - JsonParseUtil.setValue(resultMap, JsonParseUtil.getResultTimeKey(), endTime); - output.collect(JsonMapper.toJsonString(resultMap)); + JSONObject resultMap = cacheMap.get(countKey); + resultMap.put(MetaDataParse.getResultTimeKey(), endTime); + output.collect(resultMap); } } @@ -71,22 +71,19 @@ public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<Stri * * @param function 函数名称 * @param cacheMessage 结果集 - * @param nameValue 当前值 + * @param resultKeyName 结果字段名称 * @param fieldNameValue 新加值 */ - private static void functionSet(String function, Map<String, Object> cacheMessage, String resultName, Object nameValue, Object fieldNameValue) { + private static void functionSet(String function, Map<String, Object> cacheMessage, String resultKeyName, Object fieldNameValue) { switch (function) { case "sum": - cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue)); + cacheMessage.put(resultKeyName, MetricFunctions.longSum(cacheMessage.get(resultKeyName), fieldNameValue)); break; case "count": - cacheMessage.put(resultName, MetricFunctions.count(nameValue)); + cacheMessage.put(resultKeyName, MetricFunctions.count(cacheMessage.get(resultKeyName))); break; - case "unique_sip_num": - //TODO - break; - case "unique_cip_num": - //TODO + case "HLLSketchBuild": + cacheMessage.put(resultKeyName, MetricFunctions.hllSketchUnion((HllSketch) cacheMessage.get(resultKeyName), (HllSketch) fieldNameValue)); break; default: break; diff --git a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java index 0672179..e0d2e03 100644 --- a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java +++ b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java @@ -1,7 +1,12 @@ package com.zdjizhi.utils.general; -import com.zdjizhi.utils.json.JsonTypeUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.StringUtil; +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.Union; + /** * @author qidaijie @@ -10,6 +15,9 @@ import com.zdjizhi.utils.json.JsonTypeUtil; * @date 2021/7/2015:31 */ public class MetricFunctions { + private static final Log logger = LogFactory.get(); + + /** * Long类型的数据求和 * @@ -18,8 +26,8 @@ public class MetricFunctions { * @return value1 + value2 */ public static Long longSum(Object value1, Object value2) { - Long res1 = JsonTypeUtil.checkLongValue(value1); - Long res2 = JsonTypeUtil.checkLongValue(value2); + Long res1 = checkLongValue(value1); + Long res2 = checkLongValue(value2); return res1 + res2; } @@ -31,7 +39,68 @@ public class MetricFunctions { * @return count+1 */ public static Long count(Object count) { + return checkLongValue(count) + 1L; + } + + /** + * 更新HllSketch内容 + * + * @param sketch 当前sketch + * @param str ip地址 + * @return 更新后sketch + */ + public static HllSketch uniqueHllSketch(HllSketch sketch, String str) { + if (StringUtil.isNotBlank(str)) { + if (sketch != null) { + sketch.update(str); + } else { + sketch = new HllSketch(12); + sketch.update(str); + } + } + + return sketch; + } + + /** + * @param cacheSketch 缓存的sketch + * @param newSketch 聚合后的sketch + * @return 合并后的sketch + */ + public static HllSketch hllSketchUnion(HllSketch cacheSketch, HllSketch newSketch) { + Union union = new Union(12); + if (cacheSketch != null) { + union.update(cacheSketch); + } + if (newSketch != null) { + union.update(newSketch); + } + return HllSketch.heapify(union.getResult().toUpdatableByteArray()); + } + + + private static long checkLongValue(Object value) { + if (value == null) { + return 0L; + } + + if (value instanceof Long) { + return ((Long) value); + } + + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + String str = (String) value; + try { + return Long.parseLong(str); + } catch (NumberFormatException e) { + logger.error("Can not cast " + value.getClass() + "to Long,exception is:" + e.getMessage()); + } + } - return JsonTypeUtil.checkLongValue(count) + 1L; + return 0L; } } diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java index 0c8d7df..f8b2a46 100644 --- a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java +++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java @@ -3,14 +3,15 @@ package com.zdjizhi.utils.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.jayway.jsonpath.InvalidPathException; +import com.alibaba.fastjson2.JSONObject; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.StreamAggregateConfig; -import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.meta.MetaDataParse; +import org.apache.datasketches.hll.HllSketch; import java.util.ArrayList; +import java.util.Base64; import java.util.HashMap; import java.util.Map; @@ -30,18 +31,15 @@ public class ParseFunctions { * @param object 原始日志 * @return true or false */ - public static boolean filterLogs(Map<String, Object> object) { + public static boolean filterLogs(JSONObject object) { boolean available = false; - HashMap<String, String> filtersMap = JsonParseUtil.getFiltersMap(); + HashMap<String, String> filtersMap = MetaDataParse.getFiltersMap(); for (String key : filtersMap.keySet()) { - switch (key) { - case "notempty": - String value = JsonParseUtil.getString(object, filtersMap.get(key)); - if (StringUtil.isNotBlank(value)) { - available = true; - } - break; - default: + if ("notempty".equals(key)) { + String value = object.getString(filtersMap.get(key)); + if (StringUtil.isNotBlank(value)) { + available = true; + } } } return available; @@ -50,15 +48,16 @@ public class ParseFunctions { /** * 解析 dimensions 字段集 * - * @param dimensions 维度集 - * @param originalLog 原始日志 + * @param dimensions 维度集 + * @param originalLog 原始日志 * @return 结果维度集 */ - public static Map<String, Object> transDimensions(Map<String, String> dimensions, Map<String, Object> originalLog) { + public static Map<String, Object> transDimensions(Map<String, String> dimensions, JSONObject originalLog) { HashMap<String, Object> dimensionsObj = new HashMap<>(16); - for (String dimension : dimensions.keySet()) { - dimensionsObj.put(dimension, JsonParseUtil.getValue(originalLog, dimensions.get(dimension))); + for (String key : dimensions.keySet()) { + originalLog.get(dimensions.get(key)); + dimensionsObj.put(key, originalLog.get(dimensions.get(key))); } return dimensionsObj; @@ -68,57 +67,44 @@ public class ParseFunctions { * 根据原始日志字段,生成schema内指定的metrics指标json。 * * @param originalLog 原始日志json - * @return 统计metrics json + * @return 统计metrics meta */ - public static String getMetricsLog(Map<String, Object> originalLog) { - Map<String, Object> json = new HashMap<>(16); + public static JSONObject getMetricsLog(JSONObject originalLog) { + JSONObject metricsJson = new JSONObject(); - for (String logsKeyName : JsonParseUtil.getMetricsFiledNameList()) { - json.put(logsKeyName, originalLog.get(logsKeyName)); + for (String logsKeyName : MetaDataParse.getMetricsFiledNameList()) { + if (originalLog.containsKey(logsKeyName)) { + metricsJson.put(logsKeyName, originalLog.get(logsKeyName)); + } } - - return JsonMapper.toJsonString(json); + return metricsJson; } /** - * alignment ID替换操作 - * 根据缓存中的AppId对应信息,获取当前AppId对应的具体名称。 - * - * @param parameters 参数集 - * @param fieldName 原始日志列名 - */ - public static String dismantlingUtils(String parameters, Object fieldName) { - String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); - int digits = Integer.parseInt(alignmentPars[0]); - return fieldName.toString().split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits]; - } - - /** * combination 拼接操作 * 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符 * * @param parameters 参数集 - * @param message 原始日志 + * @param originalLog 原始日志 * @param logsKeyName 原始日志列名 */ - public static void combinationUtils(Map<String, Object> dimensions, Map<String, Object> message, String parameters, String resultKeyName, String logsKeyName) { + public static void combinationUtils(Map<String, Object> dimensions, JSONObject originalLog, String parameters, String resultKeyName, String logsKeyName) { String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); String combinationFieldKey = combinationPars[0]; String separator = combinationPars[1]; - Object combinationFieldValue = JsonParseUtil.getValue(message, combinationFieldKey); + Object combinationFieldValue = originalLog.get(combinationFieldKey); if (combinationFieldValue != null) { - Object logsFieldValue = JsonParseUtil.getValue(message, logsKeyName); + Object logsFieldValue = originalLog.get(logsKeyName); String combinationResult = logsFieldValue + separator + combinationFieldValue; - JsonParseUtil.setValue(dimensions, resultKeyName, combinationResult); - JsonParseUtil.setValue(message, logsKeyName, combinationResult); + dimensions.put(resultKeyName, combinationResult); } } /** * 根据表达式解析json * <p> - * //* @param message json + * //* @param message meta * * @param expr 解析表达式 * @return 解析结果 @@ -137,4 +123,80 @@ public class ParseFunctions { } } + /** + * 判断是否为日志字段,是则返回对应value,否则返回原始字符串 + * + * @param jsonMap 内存实体类 + * @param param 字段名/普通字符串 + * @return JSON.Value or String + */ + @Deprecated + private static Object isJsonValue(Map<String, Object> jsonMap, String param) { + if (param.contains(StreamAggregateConfig.IS_JSON_KEY_TAG)) { + return jsonMap.get(param.substring(2)); + } else { + return param; + } + } + + /** + * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 + * + * @param jsonMap 内存实体类 + * @param ifParam 字段名/普通字符串 + * @return resultA or resultB or null + */ + @Deprecated + public static Object condition(Map<String, Object> jsonMap, String ifParam) { + Object result = null; + String separator = "!="; + try { + String[] split = ifParam.split(StreamAggregateConfig.FORMAT_SPLITTER); + if (split.length == 3) { + String expression = split[0]; + Object resultA = isJsonValue(jsonMap, split[1]); + Object resultB = isJsonValue(jsonMap, split[2]); + if (expression.contains(separator)) { + String[] regexp = expression.split(separator); + Object direction = isJsonValue(jsonMap, regexp[0]); + if (direction instanceof Number) { + result = Integer.parseInt(direction.toString()) != Integer.parseInt(regexp[1]) ? resultA : resultB; + } else if (direction instanceof String) { + result = direction.equals(regexp[1]) ? resultA : resultB; + } + } else { + String[] regexp = expression.split(StreamAggregateConfig.IF_CONDITION_SPLITTER); + Object direction = isJsonValue(jsonMap, regexp[0]); + if (direction instanceof Number) { + result = Integer.parseInt(direction.toString()) == Integer.parseInt(regexp[1]) ? resultA : resultB; + } else if (direction instanceof String) { + result = direction.equals(regexp[1]) ? resultA : resultB; + } + } + } + } catch (RuntimeException e) { + logger.error("IF function execution exception, exception information:" + e.getMessage()); + } + return result; + } + + /** + * 获取HLLSketch内容 + * + * @param jsonMap 原始日志 + * @param key meta key名称 + * @return HLLSketch数据数组 + */ + public static String getHllSketch(JSONObject jsonMap, String key) { + try { + HllSketch hllSketchResult = (HllSketch) jsonMap.get(key); + if (hllSketchResult != null) { + return Base64.getEncoder().encodeToString(hllSketchResult.toUpdatableByteArray()); + } + } catch (RuntimeException e) { + logger.error("HllSketch data conversion exception,data may be empty! exception:{}", e); + } + return null; + } + } diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java deleted file mode 100644 index 8555b1f..0000000 --- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java +++ /dev/null @@ -1,140 +0,0 @@ -package com.zdjizhi.utils.json; - - -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.exception.AnalysisException; - -import java.util.List; -import java.util.Map; - -/** - * @author qidaijie - * @Package PACKAGE_NAME - * @Description: - * @date 2021/7/1217:34 - */ -public class JsonTypeUtil { - /** - * String 类型检验转换方法 - * - * @param value json value - * @return String value - */ - public static String checkString(Object value) { - if (value == null) { - return null; - } - - if (value instanceof Map) { - return JsonMapper.toJsonString(value); - } - - if (value instanceof List) { - return JsonMapper.toJsonString(value); - } - - return value.toString(); - } - - /** - * array 类型检验转换方法 - * - * @param value json value - * @return List value - */ - private static Map checkObject(Object value) { - if (value == null) { - return null; - } - - if (value instanceof Map) { - return (Map) value; - } - - throw new AnalysisException("can not cast to map, value : " + value); - } - - /** - * array 类型检验转换方法 - * - * @param value json value - * @return List value - */ - private static List checkArray(Object value) { - if (value == null) { - return null; - } - - if (value instanceof List) { - return (List) value; - } - - throw new AnalysisException("can not cast to List, value : " + value); - } - - private static Long checkLong(Object value) { - if (value == null) { - return null; - } - - return TypeUtils.castToLong(value); - } - - /** - * long 类型检验转换方法,若为空返回基础值 - * - * @param value json value - * @return Long value - */ - public static long checkLongValue(Object value) { - - Long longVal = TypeUtils.castToLong(value); - - if (longVal == null) { - return 0L; - } - - return longVal; - } - - /** - * Double 类型校验转换方法 - * - * @param value json value - * @return Double value - */ - private static Double checkDouble(Object value) { - if (value == null) { - return null; - } - - return TypeUtils.castToDouble(value); - } - - - private static Integer checkInt(Object value) { - if (value == null) { - return null; - } - - return TypeUtils.castToInt(value); - } - - - /** - * int 类型检验转换方法,若为空返回基础值 - * - * @param value json value - * @return int value - */ - private static int getIntValue(Object value) { - - Integer intVal = TypeUtils.castToInt(value); - - if (intVal == null) { - return 0; - } - return intVal; - } - -} diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java deleted file mode 100644 index 9cb0631..0000000 --- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java +++ /dev/null @@ -1,172 +0,0 @@ -package com.zdjizhi.utils.json; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.StreamAggregateConfig; -import com.zdjizhi.utils.StringUtil; -import com.zdjizhi.utils.exception.AnalysisException; - - -/** - * @author qidaijie - * @Package PACKAGE_NAME - * @Description: - * @date 2021/7/1218:20 - */ -public class TypeUtils { - private static final Log logger = LogFactory.get(); - - /** - * Integer 类型判断方法 - * - * @param value json value - * @return Integer value or null - */ - public static Object castToIfFunction(Object value) { - if (value == null) { - return null; - } - - if (value instanceof String) { - return value.toString(); - } - - if (value instanceof Integer) { - return ((Number) value).intValue(); - } - - if (value instanceof Long) { - return ((Number) value).longValue(); - } - - if (value instanceof Boolean) { - return (Boolean) value ? 1 : 0; - } - - throw new AnalysisException("can not cast to int, value : " + value); - } - - /** - * Integer 类型判断方法 - * - * @param value json value - * @return Integer value or null - */ - static Integer castToInt(Object value) { - - if (value == null) { - return null; - } - - if (value instanceof Integer) { - return (Integer) value; - } - - //此判断数值超范围不抛出异常,会截取成对应类型数值 -// if (value instanceof Number) { -// return ((Number) value).intValue(); -// } - - if (value instanceof String) { - String strVal = (String) value; - if (StringUtil.isBlank(strVal)) { - return null; - } - - //将 10,20 类数据转换为10 - if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) { - strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0]; - } - - try { - return Integer.parseInt(strVal); - } catch (NumberFormatException ex) { - logger.error("String change Integer Error,The error Str is:" + strVal); - } - } - - if (value instanceof Boolean) { - return (Boolean) value ? 1 : 0; - } - - throw new AnalysisException("can not cast to int, value : " + value); - } - - /** - * Double类型判断方法 - * - * @param value json value - * @return double value or null - */ - static Double castToDouble(Object value) { - - if (value instanceof Double) { - return (Double) value; - } - - //此判断数值超范围不抛出异常,会截取成对应类型数值 -// if (value instanceof Number) { -// return ((Number) value).doubleValue(); -// } - - if (value instanceof String) { - String strVal = (String) value; - - if (StringUtil.isBlank(strVal)) { - return null; - } - - //将 10,20 类数据转换为10 - if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) { - strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0]; - } - - try { - return Double.parseDouble(strVal); - } catch (NumberFormatException ex) { - logger.error("String change Double Error,The error Str is:" + strVal); - } - } - - throw new AnalysisException("can not cast to double, value : " + value); - } - - /** - * Long类型判断方法 - * - * @param value json value - * @return (Long)value or null - */ - static Long castToLong(Object value) { - if (value == null) { - return null; - } - -// 此判断数值超范围不抛出异常,会截取成对应类型数值 - if (value instanceof Number) { - return ((Number) value).longValue(); - } - - if (value instanceof String) { - String strVal = (String) value; - - if (StringUtil.isBlank(strVal)) { - return null; - } - - //将 10,20 类数据转换为10 - if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) { - strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0]; - } - - try { - return Long.parseLong(strVal); - } catch (NumberFormatException ex) { - logger.error("String change Long Error,The error Str is:" + strVal); - } - } - - throw new AnalysisException("can not cast to long, value : " + value); - } - -} diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index 545a0e3..d9f1b37 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -22,6 +22,7 @@ public class KafkaConsumer { properties.put("session.timeout.ms", StreamAggregateConfig.SESSION_TIMEOUT_MS); properties.put("max.poll.records", StreamAggregateConfig.MAX_POLL_RECORDS); properties.put("max.partition.fetch.bytes", StreamAggregateConfig.MAX_PARTITION_FETCH_BYTES); + properties.put("partition.discovery.interval.ms", "10000"); CertUtils.chooseCert(StreamAggregateConfig.SOURCE_KAFKA_SERVERS, properties); diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/meta/MetaDataParse.java index 4b3f75a..21073ac 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/meta/MetaDataParse.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils.json; +package com.zdjizhi.utils.meta; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; @@ -10,29 +10,23 @@ import com.alibaba.nacos.api.exception.NacosException; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.StreamAggregateConfig; -import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; -import net.sf.cglib.beans.BeanMap; import java.util.*; import java.util.concurrent.Executor; + /** * 使用FastJson解析json的工具类 * * @author qidaijie */ -public class JsonParseUtil { +public class MetaDataParse { private static final Log logger = LogFactory.get(); private static Properties propNacos = new Properties(); /** - * 获取actions所有的计算函数 - */ - private static HashMap<String, String[]> actionMap = new HashMap<>(16); - - /** * 解析metrics指标字段信息 */ private static HashMap<String, String[]> metricFunctionsMap = new HashMap<>(16); @@ -58,11 +52,6 @@ public class JsonParseUtil { private static ArrayList<String> metricsFiledNameList = new ArrayList<>(); /** - * 解析hierarchy函数,获取切分信息 - */ - private static String[] hierarchy; - - /** * 解析时间戳字段名称 */ private static String resultTimeKey = "stat_time"; @@ -99,70 +88,6 @@ public class JsonParseUtil { } /** - * 获取属性值的方法 - * - * @param jsonMap 原始日志 - * @param key josn key名称 - * @return 属性的值 - */ - public static Object getValue(Map<String, Object> jsonMap, String key) { - try { - return jsonMap.getOrDefault(key, null); - } catch (RuntimeException e) { - logger.error("Get the JSON value is abnormal,The key is :" + key + "error message is :" + e); - return null; - } - } - - /** - * long 类型检验转换方法,若为空返回基础值 - * - * @return Long value - */ - public static Long getLong(Map<String, Object> jsonMap, String property) { - Object value = jsonMap.getOrDefault(property, null); - Long longVal = TypeUtils.castToLong(value); - - if (longVal == null) { - return 0L; - } - - return longVal; - } - - public static String getString(Map<String, Object> jsonMap, String property) { - Object value = jsonMap.getOrDefault(property, null); - if (value == null) { - return null; - } - - if (value instanceof Map) { - return JsonMapper.toJsonString(value); - } - - if (value instanceof List) { - return JsonMapper.toJsonString(value); - } - - return value.toString(); - } - - /** - * 更新属性值的方法 - * - * @param jsonMap 原始日志json map - * @param property 更新的key - * @param value 更新的值 - */ - public static void setValue(Map<String, Object> jsonMap, String property, Object value) { - try { - jsonMap.put(property, value); - } catch (RuntimeException e) { - logger.error("The JSON set value is abnormal,the error message is :", e); - } - } - - /** * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 * 用于反射生成schema类型的对象的一个map集合 */ @@ -170,12 +95,6 @@ public class JsonParseUtil { clearCacheMap(); DocumentContext parse = JsonPath.parse(schema); - List<Object> actions = parse.read("$.doc.action[*]"); - for (Object action : actions) { - actionMap.put(JsonPath.read(action, "$.label"), - JsonPath.read(action, "$.metrics").toString().split(StreamAggregateConfig.FORMAT_SPLITTER)); - } - List<Object> metricFunctions = parse.read("$.doc.metrics[*]"); for (Object metric : metricFunctions) { metricFunctionsMap.put(JsonPath.read(metric, "$.name"), @@ -208,27 +127,10 @@ public class JsonParseUtil { transformsList.add(new String[]{function, name, fieldName, parameters}); } - List<Object> hierarchyList = parse.read("$.doc.transforms[*]"); - for (Object transform : hierarchyList) { - String function = JsonPath.read(transform, "$.function").toString(); - if ("hierarchy".equals(function)) { - String name = JsonPath.read(transform, "$.name").toString(); - String parameters = JsonPath.read(transform, "$.parameters").toString(); - hierarchy = new String[]{name, parameters}; - } - } - resultTimeKey = JsonPath.read(schema, "$.doc.timestamp.name"); } /** - * @return 解析schema获取的actions集合 - */ - public static HashMap<String, String[]> getActionMap() { - return actionMap; - } - - /** * @return 解析schema获取的指标统计方式集合 */ public static HashMap<String, String[]> getMetricFunctionsMap() { @@ -264,13 +166,6 @@ public class JsonParseUtil { } /** - * @return 解析schema获取的拆解函数 - */ - public static String[] getHierarchy() { - return hierarchy; - } - - /** * @return 解析schema获取的时间字段的key */ public static String getResultTimeKey() { @@ -281,7 +176,6 @@ public class JsonParseUtil { * 在配置变化时清空缓存,重新解析schema更新缓存 */ private static void clearCacheMap() { - actionMap.clear(); metricFunctionsMap.clear(); dimensionsMap.clear(); filtersMap.clear(); diff --git a/src/test/java/com/zdjizhi/DatasketchesTest.java b/src/test/java/com/zdjizhi/DatasketchesTest.java new file mode 100644 index 0000000..2b4b3a6 --- /dev/null +++ b/src/test/java/com/zdjizhi/DatasketchesTest.java @@ -0,0 +1,254 @@ +package com.zdjizhi; + +import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson2.*; +import com.zdjizhi.utils.JsonMapper; +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.Union; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Test; + +import java.util.*; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2023/3/217:17 + */ +public class DatasketchesTest { + + @Test + public void HllSketchTest() { + HashSet<String> strings = new HashSet<>(); + + HllSketch sketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = "192.168.1." + i; + sketch.update(ip); + strings.add(ip); + } + + System.out.println(sketch.getEstimate() + "--" + strings.size()); + + HashSet<String> randomStrings = new HashSet<>(); + + HllSketch randomSketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = makeIPv4Random(); + randomSketch.update(ip); + randomStrings.add(ip); + } + + System.out.println(randomSketch.getEstimate() + "--" + randomStrings.size()); + } + + @Test + public void HllSketchUnionTest() { + HashSet<String> strings = new HashSet<>(); + + HllSketch sketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = "192.168.1." + i; + sketch.update(ip); + strings.add(ip); + } + + HllSketch sketch2 = new HllSketch(12); + + for (int i = 0; i < 10; i++) { + String ip = "192.168.2." + i; + sketch2.update(ip); + strings.add(ip); + } + + Union union = new Union(12); + + union.update(sketch); + union.update(sketch2); + HllSketch sketch_result = HllSketch.heapify(union.getResult().toCompactByteArray()); + + System.out.println(sketch.getEstimate() + "--" + strings.size()); + System.out.println(sketch2.getEstimate() + "--" + strings.size()); + System.out.println(sketch_result.getEstimate() + "--" + strings.size()); + } + + @Test + public void HllSketchDruidTest() { + HashMap<String, Object> dataMap = new HashMap<>(); + + HashSet<String> strings = new HashSet<>(); + + HllSketch sketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = "192.168.1." + i; + sketch.update(ip); + strings.add(ip); + } + + HllSketch sketch2 = new HllSketch(12); + + for (int i = 0; i < 10; i++) { + String ip = "192.168.2." + i; + sketch2.update(ip); + strings.add(ip); + } + + Union union = new Union(12); + + union.update(sketch); + union.update(sketch2); + HllSketch sketch_result1 = HllSketch.heapify(union.getResult().toCompactByteArray()); + + HllSketch sketch3 = new HllSketch(12); + + for (int i = 0; i < 10; i++) { + String ip = "192.168.3." + i; + sketch3.update(ip); + strings.add(ip); + } + + Union union2 = new Union(12); + + union2.update(sketch_result1); + union2.update(sketch3); + HllSketch sketch_result2 = HllSketch.heapify(union2.getResult().toCompactByteArray()); + + System.out.println(sketch.getEstimate() + "--" + strings.size()); + System.out.println(sketch2.getEstimate() + "--" + strings.size()); + System.out.println(sketch3.getEstimate() + "--" + strings.size()); + System.out.println(sketch_result1.getEstimate() + "--" + strings.size()); + System.out.println(sketch_result2.getEstimate() + "--" + strings.size()); + + Result result = new Result(); + result.setC2s_pkt_num(10); + result.setS2c_pkt_num(10); + result.setC2s_byte_num(10); + result.setS2c_byte_num(10); + result.setStat_time(1679970031); + result.setSchema_type("HLLSketchMergeTest"); + + //CompactByte + result.setIp_object(sketch_result2.toCompactByteArray()); +// System.out.println(result.toString()); + //sendMessage(JsonMapper.toJsonString(result); + + + //UpdatableByte + result.setIp_object(sketch_result2.toUpdatableByteArray()); +// System.out.println(result.toString()); + //sendMessage(JsonMapper.toJsonString(result); + + //Hashmap + dataMap.put("app_name", "TEST"); + dataMap.put("protocol_stack_id", "HTTP"); + dataMap.put("vsys_id", 1); + dataMap.put("stat_time", 1681370100); + dataMap.put("client_ip_sketch", sketch_result2.toUpdatableByteArray()); + + System.out.println("Jackson:" + JsonMapper.toJsonString(dataMap)); + System.out.println("FastJson2:" + JSONObject.toJSONString(dataMap)); + System.out.println("Hutool:" + JSONUtil.toJsonStr(dataMap) + "\n\n"); + + byte[] toJSONB = JSONB.toBytes(dataMap); +// sendMessage(toJSONB); + JSONObject jsonObject = JSONB.parseObject(toJSONB); + System.out.println("FastJson2 Byte(JSONB):" + jsonObject.toJSONString() + "\n\n"); + + + dataMap.put("client_ip_sketch", Base64.getEncoder().encodeToString(sketch_result2.toUpdatableByteArray())); + System.out.println("FastJson2 Byte(Base64):" + JSONObject.toJSONString(dataMap)); + System.out.println("Hutool Byte(Base64):" + JSONObject.toJSONString(dataMap)); + System.out.println(JSONUtil.toJsonStr(dataMap)); + + +// sendMessage(JSONObject.toJSONString(dataMap)); + } + + + //随机生成ip + private static String makeIPv4Random() { + Random random = new Random(); + int v4_1 = new Random().nextInt(255) + 1; + int v4_2 = new Random().nextInt(255); + int v4_3 = new Random().nextInt(255); + int v4_4 = new Random().nextInt(255); + return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4; + } + + private static void sendMessage(Object message) { + Properties props = new Properties(); + //kafka地址 + props.put("bootstrap.servers", "192.168.44.12:9092"); + props.put("acks", "all"); + props.put("retries", 0); + props.put("linger.ms", 1); + props.put("buffer.memory", 67108864); +// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); +// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<String, Object>(props); + + kafkaProducer.send(new ProducerRecord<String, Object>("TRAFFIC-PROTOCOL-TEST", message)); + + kafkaProducer.close(); + } +} + +class Result { + + private String schema_type; + private long c2s_byte_num; + private long c2s_pkt_num; + private long s2c_byte_num; + private long s2c_pkt_num; + private long stat_time; + private byte[] ip_object; + + public void setSchema_type(String schema_type) { + this.schema_type = schema_type; + } + + public void setC2s_byte_num(long c2s_byte_num) { + this.c2s_byte_num = c2s_byte_num; + } + + public void setC2s_pkt_num(long c2s_pkt_num) { + this.c2s_pkt_num = c2s_pkt_num; + } + + public void setS2c_byte_num(long s2c_byte_num) { + this.s2c_byte_num = s2c_byte_num; + } + + public void setS2c_pkt_num(long s2c_pkt_num) { + this.s2c_pkt_num = s2c_pkt_num; + } + + public void setStat_time(long stat_time) { + this.stat_time = stat_time; + } + + public void setIp_object(byte[] ip_object) { + this.ip_object = ip_object; + } + + @Override + public String toString() { + return "Result{" + + "schema_type='" + schema_type + '\'' + + ", c2s_byte_num=" + c2s_byte_num + + ", c2s_pkt_num=" + c2s_pkt_num + + ", s2c_byte_num=" + s2c_byte_num + + ", s2c_pkt_num=" + s2c_pkt_num + + ", stat_time=" + stat_time + + ", ip_object=" + Arrays.toString(ip_object) + + '}'; + } +}
\ No newline at end of file diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java new file mode 100644 index 0000000..847dd22 --- /dev/null +++ b/src/test/java/com/zdjizhi/FunctionTest.java @@ -0,0 +1,70 @@ +package com.zdjizhi; + +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.StringUtil; +import org.junit.Test; + +import java.util.Arrays; + + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2023/1/617:54 + */ +public class FunctionTest { + public static void main(String[] args) { + String groupKey = "ETHERNET.IPv4.TCP.UNCATEGORIZED.qq_r2@4"; + String protocol = groupKey.substring(0, groupKey.indexOf("@")); + System.out.println(protocol); + StringBuffer stringBuffer = new StringBuffer(); + String appName = "qq_r2"; + String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER); + for (String proto : protocolIds) { + if (StringUtil.isBlank(stringBuffer.toString())) { + stringBuffer.append(proto); + System.out.println(stringBuffer.toString()); + } else { + stringBuffer.append(".").append(proto); + if (proto.equals(appName)) { + System.out.println(stringBuffer.toString() + "---" + appName); + } else { + System.out.println(stringBuffer.toString()); + } + } + } + } + + @Test + public void JsonPathTest() { + String json = "{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-7400\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-7400\"}]}"; + String expr = "$.tags[?(@.tag=='data_center')].value"; + Object read = JsonPath.parse(json).read(expr).toString(); + System.out.println(read); + } + + @Test + public void SplitTest() { + String str = "[.]"; + String protocol = "ETHERNET.IPv4.TCP.http.test"; + + System.out.println(Arrays.toString(protocol.split(str))); + + String str2 = "\\."; + System.out.println(Arrays.toString(protocol.split(str2))); + + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < protocol.split(str).length - 1; i++) { + String value = protocol.split(str)[i]; + if (StringUtil.isBlank(stringBuilder.toString())) { + stringBuilder.append(value); + System.out.println(stringBuilder.toString()); + }else { + stringBuilder.append(".").append(value); + System.out.println(stringBuilder.toString()); + } + } + } +} diff --git a/src/test/java/com/zdjizhi/FunctionsTest.java b/src/test/java/com/zdjizhi/FunctionsTest.java deleted file mode 100644 index 6e3a20b..0000000 --- a/src/test/java/com/zdjizhi/FunctionsTest.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.zdjizhi; - -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.json.JsonParseUtil; -import org.junit.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2021/9/1714:22 - */ -public class FunctionsTest { - private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap(); - - @Test - public void actionTest() { - HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap(); - String[] metricNames = actionMap.getOrDefault("", actionMap.get("Default")); - System.out.println(actionMap.toString()); - System.out.println(Arrays.toString(metricNames)); - - - - } - - - -} diff --git a/src/test/java/com/zdjizhi/JsonTest.java b/src/test/java/com/zdjizhi/JsonTest.java new file mode 100644 index 0000000..dd0cf97 --- /dev/null +++ b/src/test/java/com/zdjizhi/JsonTest.java @@ -0,0 +1,145 @@ +package com.zdjizhi; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import net.sf.cglib.beans.BeanGenerator; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2023/4/1218:08 + */ +public class JsonTest { + +// /** +// * 在内存中加载反射类用的map +// */ +// private static HashMap<String, Class> map = MetaDataParse.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); +// +// /** +// * 反射成一个类 +// */ +// private static Object mapObject = MetaDataParse.generateObject(map); + + @Test + public void fastJson2Test() { +// SerializerFeature.DisableCircularReferenceDetect +// SerializerFeature.WriteNullStringAsEmpty +// SerializerFeature.WriteNullNumberAsZero + HashMap<String, Class> classHashMap = new HashMap<>(); + + String message = "{\"common_schema_type\":\"HTTP\",\"common_sessions\":1,\"http_request_line\":\"GET sampleFile.html HTTP/1.1\",\"http_host\":\"www.texaslotto.com\",\"http_url\":\"www.texaslotto.com/sampleFile.html\",\"http_user_agent\":\"xPTS/2.0\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_isn\":1953597368,\"http_proxy_flag\":0,\"http_version\":\"http1\",\"http_response_latency_ms\":1,\"http_session_duration_ms\":2,\"http_response_content_type\":\"text/html\",\"http_sequence\":80,\"common_protocol_label\":\"ETHERNET.IPv4.UDP.GTP.IPv4.TCP\",\"common_c2s_byte_diff\":17200,\"common_c2s_pkt_diff\":120,\"common_s2c_byte_diff\":16490,\"common_s2c_pkt_diff\":81,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_first_ttl\":64,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":0,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":0,\"common_s2c_byte_retrans\":0,\"common_flags\":24720,\"common_flags_identify_info\":\"{\\\"Server is Local\\\":1,\\\"Inbound\\\":201,\\\"C2S\\\":1,\\\"S2C\\\":2}\",\"common_direction\":73,\"common_app_full_path\":\"http\",\"common_app_label\":\"http\",\"common_tcp_client_isn\":1953597368,\"common_tcp_server_isn\":1950649408,\"common_server_ip\":\"192.50.199.25\",\"common_client_ip\":\"192.50.146.197\",\"common_server_port\":80,\"common_client_port\":22533,\"common_stream_dir\":3,\"common_address_type\":4,\"common_address_list\":\"IPv4_TCP<22533-80-192.50.146.197-192.50.199.25>|GTP<111001144-851056526>|IPv4_UDP<2152-2152-192.50.235.220-192.50.135.83>|MAC<000c299b2fa4-000c2915b4f4>\",\"common_start_time\":1680475247,\"common_end_time\":1680475247,\"common_con_duration_ms\":23,\"common_s2c_pkt_num\":81,\"common_s2c_byte_num\":16490,\"common_c2s_pkt_num\":120,\"common_c2s_byte_num\":17200,\"common_establish_latency_ms\":2,\"common_client_location\":\"日本.Unknown.Unknown\",\"common_server_location\":\"日本.Unknown.Unknown\",\"common_service_category\":[6223,6219,5093,5089],\"common_apn\":\"cmiott.owflr.mcto60g.com\",\"common_imsi\":\"460045157091460\",\"common_imei\":\"8626070583005833\",\"common_phone_number\":\"861440152028973\",\"common_tunnel_endpoint_a_desc\":\"test_50_gtp\",\"common_tunnel_endpoint_b_desc\":\"test_50_gtp\",\"common_tunnels\":[{\"tunnels_schema_type\":\"GTP\",\"gtp_a2b_teid\":111001144,\"gtp_b2a_teid\":851056526,\"gtp_endpoint_a_ip\":\"192.50.235.220\",\"gtp_endpoint_b_ip\":\"192.50.135.83\",\"gtp_endpoint_a_port\":2152,\"gtp_endpoint_b_port\":2152},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0c:29:9b:2f:a4\",\"c2s_destination_mac\":\"00:0c:29:15:b4:f4\",\"s2c_source_mac\":\"00:0c:29:15:b4:f4\",\"s2c_destination_mac\":\"00:0c:29:9b:2f:a4\"}],\"common_stream_trace_id\":\"578829229323951427\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.161\",\"common_device_id\":\"unknown\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-7400\\\"},{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-7400\\\"}]}\",\"common_t_vsys_id\":1,\"common_policy_id\":0,\"common_service\":2,\"common_action\":0,\"common_vsys_id\":1}"; + + JSONObject json = JSON.parseObject(message); + Object mapObject = generateObject(classHashMap); + Object object = JSON.parseObject(message, mapObject.getClass()); + + + System.out.println(json.get("common_schema_type")); + json.put("common_schema_type", "SSH"); + + + System.out.println(json.toJSONString()); + + + } + + private static Class getClassName(String type) { + Class clazz; + + switch (type) { + case "int": + clazz = Integer.class; + break; + case "string": + clazz = String.class; + break; + case "long": + clazz = long.class; + break; + case "array": + clazz = List.class; + break; + case "double": + clazz = double.class; + break; + case "float": + clazz = float.class; + break; + case "char": + clazz = char.class; + break; + case "byte": + clazz = byte.class; + break; + case "boolean": + clazz = boolean.class; + break; + case "short": + clazz = short.class; + break; + default: + clazz = String.class; + } + return clazz; + } + + /** + * 根据反射生成对象的方法 + * + * @param properties 反射类用的map + * @return 生成的Object类型的对象 + */ + private static Object generateObject(Map properties) { + BeanGenerator generator = new BeanGenerator(); + Set keySet = properties.keySet(); + for (Object aKeySet : keySet) { + String key = (String) aKeySet; + generator.addProperty(key, (Class) properties.get(key)); + } + return generator.create(); + } + +// /** +// * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 +// * +// * @param http 网关schema地址 +// * @return 用于反射生成schema类型的对象的一个map集合 +// */ +// public static HashMap<String, Class> getMapFromHttp(String schema) { +// HashMap<String, Class> map = new HashMap<>(16); +// +// DocumentContext parse = JsonPath.parse(schema); +// +// //获取fields,并转化为数组,数组的每个元素都是一个name doc type +// com.alibaba.fastjson.JSONObject schemaJson = com.alibaba.fastjson.JSON.parseObject(data.toString()); +// JSONArray fields = (JSONArray) schemaJson.get("fields"); +// +// for (Object field : fields) { +// String filedStr = field.toString(); +// if (checkKeepField(filedStr)) { +// String name = JsonPath.read(filedStr, "$.name").toString(); +// String type = JsonPath.read(filedStr, "$.type").toString(); +// if (type.contains("{")) { +// type = JsonPath.read(filedStr, "$.type.type").toString(); +// } +// //组合用来生成实体类的map +// map.put(name, getClassName(type)); +// } else { +// dropList.add(filedStr); +// } +// } +// return map; +// } +} |
