summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pom.xml33
-rw-r--r--properties/default_config.properties2
-rw-r--r--properties/service_flow_config.properties21
-rw-r--r--src/main/java/com/zdjizhi/common/StreamAggregateConfig.java18
-rw-r--r--src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java23
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java50
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/filter/FilterNullFunction.java17
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java10
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java10
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java31
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/result/ResultFlatMapFunction.java51
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java71
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java59
-rw-r--r--src/main/java/com/zdjizhi/utils/general/MetricFunctions.java77
-rw-r--r--src/main/java/com/zdjizhi/utils/general/ParseFunctions.java152
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java140
-rw-r--r--src/main/java/com/zdjizhi/utils/json/TypeUtils.java172
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java1
-rw-r--r--src/main/java/com/zdjizhi/utils/meta/MetaDataParse.java (renamed from src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java)112
-rw-r--r--src/test/java/com/zdjizhi/DatasketchesTest.java254
-rw-r--r--src/test/java/com/zdjizhi/FunctionTest.java70
-rw-r--r--src/test/java/com/zdjizhi/FunctionsTest.java33
-rw-r--r--src/test/java/com/zdjizhi/JsonTest.java145
23 files changed, 861 insertions, 691 deletions
diff --git a/pom.xml b/pom.xml
index e4f3384..18789bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-olap-analysis-schema</artifactId>
- <version>220822-VSYS</version>
+ <version>230414-FastJson2</version>
<name>log-olap-analysis-schema</name>
<url>http://www.example.com</url>
@@ -16,7 +16,7 @@
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
- <url>http://192.168.40.125:8099/content/groups/public</url>
+ <url>http://192.168.40.153:8099/content/groups/public</url>
</repository>
<repository>
@@ -40,6 +40,7 @@
<hbase.version>2.2.3</hbase.version>
<nacos.version>1.2.0</nacos.version>
<zdjz.tools.version>1.0.8</zdjz.tools.version>
+ <fastjson.version>2.0.26</fastjson.version>
<scope.type>provided</scope.type>
<!--<scope.type>compile</scope.type>-->
</properties>
@@ -188,7 +189,7 @@
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
- <version>2.4.0</version>
+ <version>2.7.0</version>
</dependency>
<dependency>
@@ -209,6 +210,20 @@
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -218,6 +233,18 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.datasketches</groupId>
+ <artifactId>datasketches-java</artifactId>
+ <version>3.2.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>${fastjson.version}</version>
+ </dependency>
+
</dependencies>
</project>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index c0f8aef..73cffc5 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -3,7 +3,7 @@
session.timeout.ms=60000
#kafka source poll
-max.poll.records=3000
+max.poll.records=5000
#kafka source poll bytes
max.partition.fetch.bytes=31457280
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 97438d9..c5794bd 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -15,41 +15,42 @@ tools.library=D:\\workerspace\\dat
nacos.server=192.168.44.12:8848
#nacos namespace
-nacos.schema.namespace=prod
+nacos.schema.namespace=livecharts
#nacos data id
-nacos.data.id=liveChart_session.json
+nacos.data.id=liveChart_session_test.json
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
-source.kafka.topic=SESSION-RECORD
+source.kafka.topic=test
#补全数据 输出 topic
-sink.kafka.topic=test-result
+sink.kafka.topic=TRAFFIC-PROTOCOL-TEST
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=livecharts-test-20220816-1
+group.id=livecharts-test-20230327-3
#--------------------------------topology配置------------------------------#
#consumer 并行度
-source.parallelism=1
+source.parallelism=3
#map函数并行度
-parse.parallelism=1
+parse.parallelism=3
#第一次窗口计算并行度
-first.window.parallelism=1
+first.window.parallelism=3
#第二次窗口计算并行度
-second.window.parallelism=1
+second.window.parallelism=3
#producer 并行度
-sink.parallelism=1
+sink.parallelism=3
#初次随机预聚合窗口时间
first.count.window.time=5
#二次聚合窗口时间
second.count.window.time=15
+
diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
index 0243f97..9ea9df5 100644
--- a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
+++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
@@ -15,8 +15,24 @@ public class StreamAggregateConfig {
encryptor.setPassword("galaxy");
}
+ /**
+ * 默认的切分符号
+ */
public static final String FORMAT_SPLITTER = ",";
+ /**
+ * 协议分隔符,需要转义
+ */
public static final String PROTOCOL_SPLITTER = "\\.";
+ /**
+ * 标识字段为日志字段还是schema指定字段
+ */
+ public static final String IS_JSON_KEY_TAG = "$.";
+
+ /**
+ * if函数连接分隔符
+ */
+ public static final String IF_CONDITION_SPLITTER = "=";
+
/**
* Nacos
@@ -27,7 +43,7 @@ public class StreamAggregateConfig {
public static final String NACOS_PIN = StreamAggregateConfigurations.getStringProperty(1, "nacos.pin");
public static final String NACOS_GROUP = StreamAggregateConfigurations.getStringProperty(1, "nacos.group");
public static final String NACOS_USERNAME = StreamAggregateConfigurations.getStringProperty(1, "nacos.username");
-
+
/**
* System
*/
diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
index c2d4f31..570ea93 100644
--- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
+++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
@@ -2,11 +2,12 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.StreamAggregateConfig;
-import com.zdjizhi.utils.functions.*;
import com.zdjizhi.utils.functions.keyby.FirstKeyByFunction;
import com.zdjizhi.utils.functions.keyby.SecondKeyByFunction;
import com.zdjizhi.utils.functions.parse.ParseMapFunction;
+import com.zdjizhi.utils.functions.result.ResultFlatMapFunction;
import com.zdjizhi.utils.functions.statistics.FirstCountWindowFunction;
import com.zdjizhi.utils.functions.statistics.SecondCountWindowFunction;
import com.zdjizhi.utils.kafka.KafkaConsumer;
@@ -21,6 +22,8 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTime
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import java.util.Map;
+
/**
* @author qidaijie
@@ -38,29 +41,37 @@ public class StreamAggregateTopology {
//两个输出之间的最大时间 (单位milliseconds)
environment.setBufferTimeout(StreamAggregateConfig.BUFFER_TIMEOUT);
+ //解析原始日志
DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer())
.setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM).name(StreamAggregateConfig.SOURCE_KAFKA_TOPIC);
- SingleOutputStreamOperator<Tuple3<String, String, String>> parseDataMap = streamSource.map(new ParseMapFunction())
+ //解析原始日志初步聚合计算,增加自定义key 缓解数据倾斜
+ SingleOutputStreamOperator<Tuple3<String, String, JSONObject>> parseDataMap = streamSource.map(new ParseMapFunction())
.name("ParseDataMap")
.setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
- WindowedStream<Tuple3<String, String, String>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction())
+ //初步聚合计算,增加自定义key 缓解数据倾斜
+ WindowedStream<Tuple3<String, String, JSONObject>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction())
.window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.FIRST_COUNT_WINDOW_TIME)));
- SingleOutputStreamOperator<Tuple2<String, String>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction())
+ //初次聚合计算窗口
+ SingleOutputStreamOperator<Tuple2<String, JSONObject>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction())
.name("FirstCountWindow")
.setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM);
- WindowedStream<Tuple2<String, String>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction())
+ //二次聚合计算,使用业务的key 进行数据汇总
+ WindowedStream<Tuple2<String, JSONObject>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction())
.window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.SECOND_COUNT_WINDOW_TIME)));
- SingleOutputStreamOperator<String> secondCountWindow = secondWindow.process(new SecondCountWindowFunction())
+ //二次聚合计算窗口
+ SingleOutputStreamOperator<JSONObject> secondCountWindow = secondWindow.process(new SecondCountWindowFunction())
.name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM);
+ //拆解结果数据按protocol id循环输出
SingleOutputStreamOperator<String> resultFlatMap = secondCountWindow.flatMap(new ResultFlatMapFunction())
.name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM);
+ //输出到kafka
resultFlatMap.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka")
.setParallelism(StreamAggregateConfig.SINK_PARALLELISM).name(StreamAggregateConfig.SINK_KAFKA_TOPIC);
diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
deleted file mode 100644
index 57e8a2c..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import com.zdjizhi.common.StreamAggregateConfig;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.json.JsonParseUtil;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.Arrays;
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/7/2114:52
- */
-public class ResultFlatMapFunction implements FlatMapFunction<String, String> {
- private static String[] jobList = JsonParseUtil.getHierarchy();
- private static final String APP_NAME = "app_name";
-
- @Override
- @SuppressWarnings("unchecked")
- public void flatMap(String value, Collector out) throws Exception {
- StringBuffer stringBuffer = new StringBuffer();
- String name = jobList[0];
- Map<String, Object> jsonObject = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
- String protocol = JsonParseUtil.getString(jsonObject, name);
- String appName = JsonParseUtil.getString(jsonObject, APP_NAME);
- jsonObject.remove(APP_NAME);
- if (StringUtil.isNotBlank(protocol)) {
- String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER);
- for (String proto : protocolIds) {
- if (StringUtil.isBlank(stringBuffer.toString())) {
- stringBuffer.append(proto);
- jsonObject.put(name, stringBuffer.toString());
- out.collect(JsonMapper.toJsonString(jsonObject));
- } else {
- stringBuffer.append(jobList[1]).append(proto);
- if (proto.equals(appName)) {
- jsonObject.put(APP_NAME, appName);
- }
- jsonObject.put(name, stringBuffer.toString());
- out.collect(JsonMapper.toJsonString(jsonObject));
- }
- }
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/filter/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/filter/FilterNullFunction.java
deleted file mode 100644
index 6c83b38..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/filter/FilterNullFunction.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.zdjizhi.utils.functions.filter;
-
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class FilterNullFunction implements FilterFunction<String> {
- @Override
- public boolean filter(String message) {
- return StringUtil.isNotBlank(message);
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java
index 7783676..eed7c44 100644
--- a/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/keyby/FirstKeyByFunction.java
@@ -1,10 +1,10 @@
package com.zdjizhi.utils.functions.keyby;
-import cn.hutool.core.util.RandomUtil;
-import com.zdjizhi.common.StreamAggregateConfig;
+import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
+
+import java.util.Map;
/**
* @author qidaijie
@@ -12,10 +12,10 @@ import org.apache.flink.api.java.tuple.Tuple4;
* @Description:
* @date 2021/7/2112:13
*/
-public class FirstKeyByFunction implements KeySelector<Tuple3< String, String, String>, String> {
+public class FirstKeyByFunction implements KeySelector<Tuple3< String, String, JSONObject>, String> {
@Override
- public String getKey(Tuple3<String, String, String> value) throws Exception {
+ public String getKey(Tuple3<String, String, JSONObject> value) throws Exception {
//以map拼接的key分组
return value.f0;
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java
index fd81d6e..7fd1b27 100644
--- a/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/keyby/SecondKeyByFunction.java
@@ -1,10 +1,10 @@
package com.zdjizhi.utils.functions.keyby;
-import cn.hutool.core.util.RandomUtil;
-import com.zdjizhi.common.StreamAggregateConfig;
+
+import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
+
/**
* @author qidaijie
@@ -12,10 +12,10 @@ import org.apache.flink.api.java.tuple.Tuple4;
* @Description:
* @date 2021/7/2112:13
*/
-public class SecondKeyByFunction implements KeySelector<Tuple2<String,String>, String> {
+public class SecondKeyByFunction implements KeySelector<Tuple2<String, JSONObject>, String> {
@Override
- public String getKey(Tuple2<String, String> value) throws Exception {
+ public String getKey(Tuple2<String, JSONObject> value) throws Exception {
//以map拼接的key分组
return value.f0;
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java
index 4d69c82..a58b637 100644
--- a/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/parse/ParseMapFunction.java
@@ -2,16 +2,15 @@ package com.zdjizhi.utils.functions.parse;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.StreamAggregateConfig;
-import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.ParseFunctions;
-import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.meta.MetaDataParse;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -22,20 +21,19 @@ import java.util.concurrent.ThreadLocalRandom;
* @Description:
* @date 2021/5/2715:01
*/
-public class ParseMapFunction implements MapFunction<String, Tuple3<String, String, String>> {
+public class ParseMapFunction implements MapFunction<String, Tuple3<String, String, JSONObject>> {
private static final Log logger = LogFactory.get();
@Override
@SuppressWarnings("unchecked")
- public Tuple3<String, String, String> map(String message) {
+ public Tuple3<String, String, JSONObject> map(String message) {
try {
- ArrayList<String[]> jobList = JsonParseUtil.getTransformsList();
- HashMap<String, String> dimensionsMap = JsonParseUtil.getDimensionsMap();
if (StringUtil.isNotBlank(message)) {
- Map<String, Object> originalLog = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
- Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, originalLog);
+ JSONObject originalLog = JSON.parseObject(message);
+ Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(MetaDataParse.getDimensionsMap(), originalLog);
if (ParseFunctions.filterLogs(originalLog)) {
- for (String[] strings : jobList) {
+ JSONObject metricsLog = ParseFunctions.getMetricsLog(originalLog);
+ for (String[] strings : MetaDataParse.getTransformsList()) {
//函数名称
String function = strings[0];
//结果集字段key
@@ -44,9 +42,8 @@ public class ParseMapFunction implements MapFunction<String, Tuple3<String, Stri
String logsKeyName = strings[2];
//额外的参数的值
String parameters = strings[3];
-
//原始日志字段对应的值
- Object logsValue = JsonParseUtil.getValue(originalLog, logsKeyName);
+ Object logsValue = originalLog.get(logsKeyName);
switch (function) {
case "combination":
@@ -64,8 +61,8 @@ public class ParseMapFunction implements MapFunction<String, Tuple3<String, Stri
}
break;
case "hierarchy":
- String key = JsonParseUtil.getString(dimensionsObj, resultKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM);
- return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(originalLog));
+ String key = dimensionsObj.get(resultKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM);
+ return new Tuple3<>(key, JSONObject.toJSONString(dimensionsObj), metricsLog);
default:
break;
}
@@ -74,9 +71,9 @@ public class ParseMapFunction implements MapFunction<String, Tuple3<String, Stri
}
} catch (RuntimeException e) {
logger.error("An error occurred in the original log parsing reorganization,error message is:" + e);
- return new Tuple3<>("", "", "");
+ return new Tuple3<>("", "", null);
}
- return new Tuple3<>("", "", "");
+ return new Tuple3<>("", "", null);
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/result/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/result/ResultFlatMapFunction.java
new file mode 100644
index 0000000..ec98c28
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/result/ResultFlatMapFunction.java
@@ -0,0 +1,51 @@
+package com.zdjizhi.utils.functions.result;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSONObject;
+import com.zdjizhi.common.StreamAggregateConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.general.ParseFunctions;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2114:52
+ */
+public class ResultFlatMapFunction implements FlatMapFunction<JSONObject, String> {
+ private static final Log logger = LogFactory.get();
+
+ private static final String PROTOCOL_ID_KEY = "protocol_stack_id";
+ private static final String APP_NAME_KEY = "app_name";
+ private static final String HLL_SKETCH_KEY = "client_ip_sketch";
+
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void flatMap(JSONObject jsonObject, Collector<String> out) throws Exception {
+ String protocol = jsonObject.getString(PROTOCOL_ID_KEY);
+ if (jsonObject.containsKey(HLL_SKETCH_KEY)) {
+ jsonObject.put(HLL_SKETCH_KEY, ParseFunctions.getHllSketch(jsonObject, HLL_SKETCH_KEY));
+ }
+ out.collect(jsonObject.toString());
+ jsonObject.remove(APP_NAME_KEY);
+
+ StringBuilder stringBuilder = new StringBuilder();
+ String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER);
+ int protocolIdsNum = protocolIds.length;
+ for (int i = 0; i < protocolIdsNum - 1; i++) {
+ if (StringUtil.isBlank(stringBuilder.toString())) {
+ stringBuilder.append(protocolIds[i]);
+ jsonObject.put(PROTOCOL_ID_KEY, stringBuilder.toString());
+ out.collect(jsonObject.toString());
+ } else {
+ stringBuilder.append(".").append(protocolIds[i]);
+ jsonObject.put(PROTOCOL_ID_KEY, stringBuilder.toString());
+ out.collect(jsonObject.toString());
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java
index 93844be..b42f107 100644
--- a/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/statistics/FirstCountWindowFunction.java
@@ -1,20 +1,19 @@
package com.zdjizhi.utils.functions.statistics;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.StringUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.utils.general.MetricFunctions;
-import com.zdjizhi.utils.general.ParseFunctions;
-import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.meta.MetaDataParse;
+import org.apache.datasketches.hll.HllSketch;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
-import java.util.Map;
/**
* @author qidaijie
@@ -22,44 +21,34 @@ import java.util.Map;
* @Description:
* @date 2021/7/2113:55
*/
-public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, Tuple2<String, String>, String, TimeWindow> {
- private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class);
+public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
+ private static final Log logger = LogFactory.get();
- private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(16);
+ private HashMap<String, JSONObject> cacheMap = new HashMap<>(32);
@Override
@SuppressWarnings("unchecked")
- public void process(String key, Context context, Iterable<Tuple3<String, String, String>> input, Collector<Tuple2<String, String>> output) {
+ public void process(String key, Context context, Iterable<Tuple3<String, String, JSONObject>> input, Collector<Tuple2<String, JSONObject>> output) {
try {
- HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricFunctionsMap();
- HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap();
- for (Tuple3<String, String, String> tuple : input) {
- String groupKey = tuple.f0;
- String protocol = groupKey.substring(0, groupKey.indexOf("@"));
+ HashMap<String, String[]> metricsMap = MetaDataParse.getMetricFunctionsMap();
+ for (Tuple3<String, String, JSONObject> tuple : input) {
String dimensions = tuple.f1;
- String metrics = tuple.f2;
- //action中某个协议的所有action,如果没有就默认
- String[] protocolMetrics = actionMap.getOrDefault(protocol, actionMap.get("Default"));
- if (StringUtil.isNotBlank(metrics)) {
- Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class);
- Map<String, Object> metricsObj = (Map<String, Object>) JsonMapper.fromJsonString(metrics, Map.class);
+ JSONObject metrics = tuple.f2;
+ JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, JSON.parseObject(dimensions));
- Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj);
- for (String protocolMetric : protocolMetrics) {
- String[] functions = metricsMap.get(protocolMetric);
- String function = functions[0];
- String fieldName = functions[1];
- functionSet(function, cacheMessage, protocolMetric, cacheMessage.get(protocolMetric), JsonParseUtil.getValue(metricsObj, fieldName));
-
- }
- cacheMap.put(dimensions, cacheMessage);
+ for (String resultKeyName : metricsMap.keySet()) {
+ String[] functions = metricsMap.get(resultKeyName);
+ String function = functions[0];
+ String fieldName = functions[1];
+ functionSet(function, cacheMessage, resultKeyName, metrics.get(fieldName));
}
+ cacheMap.put(dimensions, cacheMessage);
}
+
if (!cacheMap.isEmpty()) {
for (String dimensions : cacheMap.keySet()) {
- Map<String, Object> resultMap = cacheMap.get(dimensions);
- output.collect(new Tuple2<>(dimensions, JsonMapper.toJsonString(resultMap)));
+ output.collect(new Tuple2<>(dimensions, cacheMap.get(dimensions)));
}
}
@@ -71,27 +60,25 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<Strin
}
}
+
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param function 函数名称
* @param cacheMessage 结果集
- * @param nameValue 当前值
+ * @param resultKeyName 结果字段名称
* @param fieldNameValue 新加值
*/
- private static void functionSet(String function, Map<String, Object> cacheMessage, String resultName, Object nameValue, Object fieldNameValue) {
+ private static void functionSet(String function, JSONObject cacheMessage, String resultKeyName, Object fieldNameValue) {
switch (function) {
case "sum":
- cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue));
+ cacheMessage.put(resultKeyName, MetricFunctions.longSum(cacheMessage.get(resultKeyName), fieldNameValue));
break;
case "count":
- cacheMessage.put(resultName, MetricFunctions.count(nameValue));
- break;
- case "unique_sip_num":
- //TODO
+ cacheMessage.put(resultKeyName, MetricFunctions.count(cacheMessage.get(resultKeyName)));
break;
- case "unique_cip_num":
- //TODO
+ case "HLLSketchBuild":
+ cacheMessage.put(resultKeyName, MetricFunctions.uniqueHllSketch((HllSketch) cacheMessage.get(resultKeyName), fieldNameValue.toString()));
break;
default:
break;
diff --git a/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java
index c0b2091..e46da07 100644
--- a/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/statistics/SecondCountWindowFunction.java
@@ -1,15 +1,17 @@
package com.zdjizhi.utils.functions.statistics;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.StringUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.utils.general.MetricFunctions;
-import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.general.ParseFunctions;
+import com.zdjizhi.utils.meta.MetaDataParse;
+import org.apache.datasketches.hll.HllSketch;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -20,28 +22,26 @@ import java.util.Map;
* @Description:
* @date 2021/7/2113:55
*/
-public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<String, String>, String, String, TimeWindow> {
- private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class);
+public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<String, JSONObject>, JSONObject, String, TimeWindow> {
+ private static final Log logger = LogFactory.get();
- private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(16);
+ private HashMap<String, JSONObject> cacheMap = new HashMap<>(32);
@Override
@SuppressWarnings("unchecked")
- public void process(String key, Context context, Iterable<Tuple2<String, String>> input, Collector<String> output) {
+ public void process(String key, Context context, Iterable<Tuple2<String, JSONObject>> input, Collector<JSONObject> output) {
try {
- HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricFunctionsMap();
- for (Tuple2<String, String> tuple : input) {
+ HashMap<String, String[]> metricsMap = MetaDataParse.getMetricFunctionsMap();
+ for (Tuple2<String, JSONObject> tuple : input) {
String dimensions = tuple.f0;
- String message = tuple.f1;
- if (StringUtil.isNotBlank(message)) {
- Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class);
- Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
+ Map<String, Object> message = tuple.f1;
+ if (message.size() != 0) {
+ JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, JSON.parseObject(dimensions));
- Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj);
- for (String name : metricsMap.keySet()) {
- String[] metrics = metricsMap.get(name);
+ for (String resultName : metricsMap.keySet()) {
+ String[] metrics = metricsMap.get(resultName);
String function = metrics[0];
- functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, name));
+ functionSet(function, cacheMessage, resultName, message.get(resultName));
}
cacheMap.put(dimensions, cacheMessage);
@@ -52,9 +52,9 @@ public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<Stri
Long endTime = context.window().getEnd() / 1000;
for (String countKey : cacheMap.keySet()) {
- Map<String, Object> resultMap = cacheMap.get(countKey);
- JsonParseUtil.setValue(resultMap, JsonParseUtil.getResultTimeKey(), endTime);
- output.collect(JsonMapper.toJsonString(resultMap));
+ JSONObject resultMap = cacheMap.get(countKey);
+ resultMap.put(MetaDataParse.getResultTimeKey(), endTime);
+ output.collect(resultMap);
}
}
@@ -71,22 +71,19 @@ public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<Stri
*
* @param function 函数名称
* @param cacheMessage 结果集
- * @param nameValue 当前值
+ * @param resultKeyName 结果字段名称
* @param fieldNameValue 新加值
*/
- private static void functionSet(String function, Map<String, Object> cacheMessage, String resultName, Object nameValue, Object fieldNameValue) {
+ private static void functionSet(String function, Map<String, Object> cacheMessage, String resultKeyName, Object fieldNameValue) {
switch (function) {
case "sum":
- cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue));
+ cacheMessage.put(resultKeyName, MetricFunctions.longSum(cacheMessage.get(resultKeyName), fieldNameValue));
break;
case "count":
- cacheMessage.put(resultName, MetricFunctions.count(nameValue));
+ cacheMessage.put(resultKeyName, MetricFunctions.count(cacheMessage.get(resultKeyName)));
break;
- case "unique_sip_num":
- //TODO
- break;
- case "unique_cip_num":
- //TODO
+ case "HLLSketchBuild":
+ cacheMessage.put(resultKeyName, MetricFunctions.hllSketchUnion((HllSketch) cacheMessage.get(resultKeyName), (HllSketch) fieldNameValue));
break;
default:
break;
diff --git a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java
index 0672179..e0d2e03 100644
--- a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java
+++ b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java
@@ -1,7 +1,12 @@
package com.zdjizhi.utils.general;
-import com.zdjizhi.utils.json.JsonTypeUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.hll.Union;
+
/**
* @author qidaijie
@@ -10,6 +15,9 @@ import com.zdjizhi.utils.json.JsonTypeUtil;
* @date 2021/7/2015:31
*/
public class MetricFunctions {
+ private static final Log logger = LogFactory.get();
+
+
/**
* Long类型的数据求和
*
@@ -18,8 +26,8 @@ public class MetricFunctions {
* @return value1 + value2
*/
public static Long longSum(Object value1, Object value2) {
- Long res1 = JsonTypeUtil.checkLongValue(value1);
- Long res2 = JsonTypeUtil.checkLongValue(value2);
+ Long res1 = checkLongValue(value1);
+ Long res2 = checkLongValue(value2);
return res1 + res2;
}
@@ -31,7 +39,68 @@ public class MetricFunctions {
* @return count+1
*/
public static Long count(Object count) {
+ return checkLongValue(count) + 1L;
+ }
+
+ /**
+ * 更新HllSketch内容
+ *
+ * @param sketch 当前sketch
+ * @param str ip地址
+ * @return 更新后sketch
+ */
+ public static HllSketch uniqueHllSketch(HllSketch sketch, String str) {
+ if (StringUtil.isNotBlank(str)) {
+ if (sketch != null) {
+ sketch.update(str);
+ } else {
+ sketch = new HllSketch(12);
+ sketch.update(str);
+ }
+ }
+
+ return sketch;
+ }
+
+ /**
+ * @param cacheSketch 缓存的sketch
+ * @param newSketch 聚合后的sketch
+ * @return 合并后的sketch
+ */
+ public static HllSketch hllSketchUnion(HllSketch cacheSketch, HllSketch newSketch) {
+ Union union = new Union(12);
+ if (cacheSketch != null) {
+ union.update(cacheSketch);
+ }
+ if (newSketch != null) {
+ union.update(newSketch);
+ }
+ return HllSketch.heapify(union.getResult().toUpdatableByteArray());
+ }
+
+
+ private static long checkLongValue(Object value) {
+ if (value == null) {
+ return 0L;
+ }
+
+ if (value instanceof Long) {
+ return ((Long) value);
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+
+ if (value instanceof String) {
+ String str = (String) value;
+ try {
+ return Long.parseLong(str);
+ } catch (NumberFormatException e) {
+ logger.error("Can not cast " + value.getClass() + "to Long,exception is:" + e.getMessage());
+ }
+ }
- return JsonTypeUtil.checkLongValue(count) + 1L;
+ return 0L;
}
}
diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
index 0c8d7df..f8b2a46 100644
--- a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
+++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
@@ -3,14 +3,15 @@ package com.zdjizhi.utils.general;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.jayway.jsonpath.InvalidPathException;
+import com.alibaba.fastjson2.JSONObject;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.StreamAggregateConfig;
-import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.meta.MetaDataParse;
+import org.apache.datasketches.hll.HllSketch;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
@@ -30,18 +31,15 @@ public class ParseFunctions {
* @param object 原始日志
* @return true or false
*/
- public static boolean filterLogs(Map<String, Object> object) {
+ public static boolean filterLogs(JSONObject object) {
boolean available = false;
- HashMap<String, String> filtersMap = JsonParseUtil.getFiltersMap();
+ HashMap<String, String> filtersMap = MetaDataParse.getFiltersMap();
for (String key : filtersMap.keySet()) {
- switch (key) {
- case "notempty":
- String value = JsonParseUtil.getString(object, filtersMap.get(key));
- if (StringUtil.isNotBlank(value)) {
- available = true;
- }
- break;
- default:
+ if ("notempty".equals(key)) {
+ String value = object.getString(filtersMap.get(key));
+ if (StringUtil.isNotBlank(value)) {
+ available = true;
+ }
}
}
return available;
@@ -50,15 +48,16 @@ public class ParseFunctions {
/**
* 解析 dimensions 字段集
*
- * @param dimensions 维度集
- * @param originalLog 原始日志
+ * @param dimensions 维度集
+ * @param originalLog 原始日志
* @return 结果维度集
*/
- public static Map<String, Object> transDimensions(Map<String, String> dimensions, Map<String, Object> originalLog) {
+ public static Map<String, Object> transDimensions(Map<String, String> dimensions, JSONObject originalLog) {
HashMap<String, Object> dimensionsObj = new HashMap<>(16);
- for (String dimension : dimensions.keySet()) {
- dimensionsObj.put(dimension, JsonParseUtil.getValue(originalLog, dimensions.get(dimension)));
+ for (String key : dimensions.keySet()) {
+ originalLog.get(dimensions.get(key));
+ dimensionsObj.put(key, originalLog.get(dimensions.get(key)));
}
return dimensionsObj;
@@ -68,57 +67,44 @@ public class ParseFunctions {
* 根据原始日志字段,生成schema内指定的metrics指标json。
*
* @param originalLog 原始日志json
- * @return 统计metrics json
+ * @return 统计metrics meta
*/
- public static String getMetricsLog(Map<String, Object> originalLog) {
- Map<String, Object> json = new HashMap<>(16);
+ public static JSONObject getMetricsLog(JSONObject originalLog) {
+ JSONObject metricsJson = new JSONObject();
- for (String logsKeyName : JsonParseUtil.getMetricsFiledNameList()) {
- json.put(logsKeyName, originalLog.get(logsKeyName));
+ for (String logsKeyName : MetaDataParse.getMetricsFiledNameList()) {
+ if (originalLog.containsKey(logsKeyName)) {
+ metricsJson.put(logsKeyName, originalLog.get(logsKeyName));
+ }
}
-
- return JsonMapper.toJsonString(json);
+ return metricsJson;
}
/**
- * alignment ID替换操作
- * 根据缓存中的AppId对应信息,获取当前AppId对应的具体名称。
- *
- * @param parameters 参数集
- * @param fieldName 原始日志列名
- */
- public static String dismantlingUtils(String parameters, Object fieldName) {
- String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
- int digits = Integer.parseInt(alignmentPars[0]);
- return fieldName.toString().split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits];
- }
-
- /**
* combination 拼接操作
* 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符
*
* @param parameters 参数集
- * @param message 原始日志
+ * @param originalLog 原始日志
* @param logsKeyName 原始日志列名
*/
- public static void combinationUtils(Map<String, Object> dimensions, Map<String, Object> message, String parameters, String resultKeyName, String logsKeyName) {
+ public static void combinationUtils(Map<String, Object> dimensions, JSONObject originalLog, String parameters, String resultKeyName, String logsKeyName) {
String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
String combinationFieldKey = combinationPars[0];
String separator = combinationPars[1];
- Object combinationFieldValue = JsonParseUtil.getValue(message, combinationFieldKey);
+ Object combinationFieldValue = originalLog.get(combinationFieldKey);
if (combinationFieldValue != null) {
- Object logsFieldValue = JsonParseUtil.getValue(message, logsKeyName);
+ Object logsFieldValue = originalLog.get(logsKeyName);
String combinationResult = logsFieldValue + separator + combinationFieldValue;
- JsonParseUtil.setValue(dimensions, resultKeyName, combinationResult);
- JsonParseUtil.setValue(message, logsKeyName, combinationResult);
+ dimensions.put(resultKeyName, combinationResult);
}
}
/**
* 根据表达式解析json
* <p>
- * //* @param message json
+ * //* @param message meta
*
* @param expr 解析表达式
* @return 解析结果
@@ -137,4 +123,80 @@ public class ParseFunctions {
}
}
+ /**
+ * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
+ *
+ * @param jsonMap 内存实体类
+ * @param param 字段名/普通字符串
+ * @return JSON.Value or String
+ */
+ @Deprecated
+ private static Object isJsonValue(Map<String, Object> jsonMap, String param) {
+ if (param.contains(StreamAggregateConfig.IS_JSON_KEY_TAG)) {
+ return jsonMap.get(param.substring(2));
+ } else {
+ return param;
+ }
+ }
+
+ /**
+ * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
+ *
+ * @param jsonMap 内存实体类
+ * @param ifParam 字段名/普通字符串
+ * @return resultA or resultB or null
+ */
+ @Deprecated
+ public static Object condition(Map<String, Object> jsonMap, String ifParam) {
+ Object result = null;
+ String separator = "!=";
+ try {
+ String[] split = ifParam.split(StreamAggregateConfig.FORMAT_SPLITTER);
+ if (split.length == 3) {
+ String expression = split[0];
+ Object resultA = isJsonValue(jsonMap, split[1]);
+ Object resultB = isJsonValue(jsonMap, split[2]);
+ if (expression.contains(separator)) {
+ String[] regexp = expression.split(separator);
+ Object direction = isJsonValue(jsonMap, regexp[0]);
+ if (direction instanceof Number) {
+ result = Integer.parseInt(direction.toString()) != Integer.parseInt(regexp[1]) ? resultA : resultB;
+ } else if (direction instanceof String) {
+ result = direction.equals(regexp[1]) ? resultA : resultB;
+ }
+ } else {
+ String[] regexp = expression.split(StreamAggregateConfig.IF_CONDITION_SPLITTER);
+ Object direction = isJsonValue(jsonMap, regexp[0]);
+ if (direction instanceof Number) {
+ result = Integer.parseInt(direction.toString()) == Integer.parseInt(regexp[1]) ? resultA : resultB;
+ } else if (direction instanceof String) {
+ result = direction.equals(regexp[1]) ? resultA : resultB;
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("IF function execution exception, exception information:" + e.getMessage());
+ }
+ return result;
+ }
+
+ /**
+ * 获取HLLSketch内容
+ *
+ * @param jsonMap 原始日志
+ * @param key meta key名称
+ * @return HLLSketch数据数组
+ */
+ public static String getHllSketch(JSONObject jsonMap, String key) {
+ try {
+ HllSketch hllSketchResult = (HllSketch) jsonMap.get(key);
+ if (hllSketchResult != null) {
+ return Base64.getEncoder().encodeToString(hllSketchResult.toUpdatableByteArray());
+ }
+ } catch (RuntimeException e) {
+ logger.error("HllSketch data conversion exception,data may be empty! exception:{}", e);
+ }
+ return null;
+ }
+
}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
deleted file mode 100644
index 8555b1f..0000000
--- a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtil.java
+++ /dev/null
@@ -1,140 +0,0 @@
-package com.zdjizhi.utils.json;
-
-
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.exception.AnalysisException;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @Package PACKAGE_NAME
- * @Description:
- * @date 2021/7/1217:34
- */
-public class JsonTypeUtil {
- /**
- * String 类型检验转换方法
- *
- * @param value json value
- * @return String value
- */
- public static String checkString(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Map) {
- return JsonMapper.toJsonString(value);
- }
-
- if (value instanceof List) {
- return JsonMapper.toJsonString(value);
- }
-
- return value.toString();
- }
-
- /**
- * array 类型检验转换方法
- *
- * @param value json value
- * @return List value
- */
- private static Map checkObject(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof Map) {
- return (Map) value;
- }
-
- throw new AnalysisException("can not cast to map, value : " + value);
- }
-
- /**
- * array 类型检验转换方法
- *
- * @param value json value
- * @return List value
- */
- private static List checkArray(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof List) {
- return (List) value;
- }
-
- throw new AnalysisException("can not cast to List, value : " + value);
- }
-
- private static Long checkLong(Object value) {
- if (value == null) {
- return null;
- }
-
- return TypeUtils.castToLong(value);
- }
-
- /**
- * long 类型检验转换方法,若为空返回基础值
- *
- * @param value json value
- * @return Long value
- */
- public static long checkLongValue(Object value) {
-
- Long longVal = TypeUtils.castToLong(value);
-
- if (longVal == null) {
- return 0L;
- }
-
- return longVal;
- }
-
- /**
- * Double 类型校验转换方法
- *
- * @param value json value
- * @return Double value
- */
- private static Double checkDouble(Object value) {
- if (value == null) {
- return null;
- }
-
- return TypeUtils.castToDouble(value);
- }
-
-
- private static Integer checkInt(Object value) {
- if (value == null) {
- return null;
- }
-
- return TypeUtils.castToInt(value);
- }
-
-
- /**
- * int 类型检验转换方法,若为空返回基础值
- *
- * @param value json value
- * @return int value
- */
- private static int getIntValue(Object value) {
-
- Integer intVal = TypeUtils.castToInt(value);
-
- if (intVal == null) {
- return 0;
- }
- return intVal;
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
deleted file mode 100644
index 9cb0631..0000000
--- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
+++ /dev/null
@@ -1,172 +0,0 @@
-package com.zdjizhi.utils.json;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.StreamAggregateConfig;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.exception.AnalysisException;
-
-
-/**
- * @author qidaijie
- * @Package PACKAGE_NAME
- * @Description:
- * @date 2021/7/1218:20
- */
-public class TypeUtils {
- private static final Log logger = LogFactory.get();
-
- /**
- * Integer 类型判断方法
- *
- * @param value json value
- * @return Integer value or null
- */
- public static Object castToIfFunction(Object value) {
- if (value == null) {
- return null;
- }
-
- if (value instanceof String) {
- return value.toString();
- }
-
- if (value instanceof Integer) {
- return ((Number) value).intValue();
- }
-
- if (value instanceof Long) {
- return ((Number) value).longValue();
- }
-
- if (value instanceof Boolean) {
- return (Boolean) value ? 1 : 0;
- }
-
- throw new AnalysisException("can not cast to int, value : " + value);
- }
-
- /**
- * Integer 类型判断方法
- *
- * @param value json value
- * @return Integer value or null
- */
- static Integer castToInt(Object value) {
-
- if (value == null) {
- return null;
- }
-
- if (value instanceof Integer) {
- return (Integer) value;
- }
-
- //此判断数值超范围不抛出异常,会截取成对应类型数值
-// if (value instanceof Number) {
-// return ((Number) value).intValue();
-// }
-
- if (value instanceof String) {
- String strVal = (String) value;
- if (StringUtil.isBlank(strVal)) {
- return null;
- }
-
- //将 10,20 类数据转换为10
- if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
- strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
- }
-
- try {
- return Integer.parseInt(strVal);
- } catch (NumberFormatException ex) {
- logger.error("String change Integer Error,The error Str is:" + strVal);
- }
- }
-
- if (value instanceof Boolean) {
- return (Boolean) value ? 1 : 0;
- }
-
- throw new AnalysisException("can not cast to int, value : " + value);
- }
-
- /**
- * Double类型判断方法
- *
- * @param value json value
- * @return double value or null
- */
- static Double castToDouble(Object value) {
-
- if (value instanceof Double) {
- return (Double) value;
- }
-
- //此判断数值超范围不抛出异常,会截取成对应类型数值
-// if (value instanceof Number) {
-// return ((Number) value).doubleValue();
-// }
-
- if (value instanceof String) {
- String strVal = (String) value;
-
- if (StringUtil.isBlank(strVal)) {
- return null;
- }
-
- //将 10,20 类数据转换为10
- if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
- strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
- }
-
- try {
- return Double.parseDouble(strVal);
- } catch (NumberFormatException ex) {
- logger.error("String change Double Error,The error Str is:" + strVal);
- }
- }
-
- throw new AnalysisException("can not cast to double, value : " + value);
- }
-
- /**
- * Long类型判断方法
- *
- * @param value json value
- * @return (Long)value or null
- */
- static Long castToLong(Object value) {
- if (value == null) {
- return null;
- }
-
-// 此判断数值超范围不抛出异常,会截取成对应类型数值
- if (value instanceof Number) {
- return ((Number) value).longValue();
- }
-
- if (value instanceof String) {
- String strVal = (String) value;
-
- if (StringUtil.isBlank(strVal)) {
- return null;
- }
-
- //将 10,20 类数据转换为10
- if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
- strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
- }
-
- try {
- return Long.parseLong(strVal);
- } catch (NumberFormatException ex) {
- logger.error("String change Long Error,The error Str is:" + strVal);
- }
- }
-
- throw new AnalysisException("can not cast to long, value : " + value);
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index 545a0e3..d9f1b37 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -22,6 +22,7 @@ public class KafkaConsumer {
properties.put("session.timeout.ms", StreamAggregateConfig.SESSION_TIMEOUT_MS);
properties.put("max.poll.records", StreamAggregateConfig.MAX_POLL_RECORDS);
properties.put("max.partition.fetch.bytes", StreamAggregateConfig.MAX_PARTITION_FETCH_BYTES);
+ properties.put("partition.discovery.interval.ms", "10000");
CertUtils.chooseCert(StreamAggregateConfig.SOURCE_KAFKA_SERVERS, properties);
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/meta/MetaDataParse.java
index 4b3f75a..21073ac 100644
--- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/utils/meta/MetaDataParse.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.utils.json;
+package com.zdjizhi.utils.meta;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
@@ -10,29 +10,23 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.StreamAggregateConfig;
-import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
-import net.sf.cglib.beans.BeanMap;
import java.util.*;
import java.util.concurrent.Executor;
+
/**
* 使用FastJson解析json的工具类
*
* @author qidaijie
*/
-public class JsonParseUtil {
+public class MetaDataParse {
private static final Log logger = LogFactory.get();
private static Properties propNacos = new Properties();
/**
- * 获取actions所有的计算函数
- */
- private static HashMap<String, String[]> actionMap = new HashMap<>(16);
-
- /**
* 解析metrics指标字段信息
*/
private static HashMap<String, String[]> metricFunctionsMap = new HashMap<>(16);
@@ -58,11 +52,6 @@ public class JsonParseUtil {
private static ArrayList<String> metricsFiledNameList = new ArrayList<>();
/**
- * 解析hierarchy函数,获取切分信息
- */
- private static String[] hierarchy;
-
- /**
* 解析时间戳字段名称
*/
private static String resultTimeKey = "stat_time";
@@ -99,70 +88,6 @@ public class JsonParseUtil {
}
/**
- * 获取属性值的方法
- *
- * @param jsonMap 原始日志
- * @param key josn key名称
- * @return 属性的值
- */
- public static Object getValue(Map<String, Object> jsonMap, String key) {
- try {
- return jsonMap.getOrDefault(key, null);
- } catch (RuntimeException e) {
- logger.error("Get the JSON value is abnormal,The key is :" + key + "error message is :" + e);
- return null;
- }
- }
-
- /**
- * long 类型检验转换方法,若为空返回基础值
- *
- * @return Long value
- */
- public static Long getLong(Map<String, Object> jsonMap, String property) {
- Object value = jsonMap.getOrDefault(property, null);
- Long longVal = TypeUtils.castToLong(value);
-
- if (longVal == null) {
- return 0L;
- }
-
- return longVal;
- }
-
- public static String getString(Map<String, Object> jsonMap, String property) {
- Object value = jsonMap.getOrDefault(property, null);
- if (value == null) {
- return null;
- }
-
- if (value instanceof Map) {
- return JsonMapper.toJsonString(value);
- }
-
- if (value instanceof List) {
- return JsonMapper.toJsonString(value);
- }
-
- return value.toString();
- }
-
- /**
- * 更新属性值的方法
- *
- * @param jsonMap 原始日志json map
- * @param property 更新的key
- * @param value 更新的值
- */
- public static void setValue(Map<String, Object> jsonMap, String property, Object value) {
- try {
- jsonMap.put(property, value);
- } catch (RuntimeException e) {
- logger.error("The JSON set value is abnormal,the error message is :", e);
- }
- }
-
- /**
* 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
* 用于反射生成schema类型的对象的一个map集合
*/
@@ -170,12 +95,6 @@ public class JsonParseUtil {
clearCacheMap();
DocumentContext parse = JsonPath.parse(schema);
- List<Object> actions = parse.read("$.doc.action[*]");
- for (Object action : actions) {
- actionMap.put(JsonPath.read(action, "$.label"),
- JsonPath.read(action, "$.metrics").toString().split(StreamAggregateConfig.FORMAT_SPLITTER));
- }
-
List<Object> metricFunctions = parse.read("$.doc.metrics[*]");
for (Object metric : metricFunctions) {
metricFunctionsMap.put(JsonPath.read(metric, "$.name"),
@@ -208,27 +127,10 @@ public class JsonParseUtil {
transformsList.add(new String[]{function, name, fieldName, parameters});
}
- List<Object> hierarchyList = parse.read("$.doc.transforms[*]");
- for (Object transform : hierarchyList) {
- String function = JsonPath.read(transform, "$.function").toString();
- if ("hierarchy".equals(function)) {
- String name = JsonPath.read(transform, "$.name").toString();
- String parameters = JsonPath.read(transform, "$.parameters").toString();
- hierarchy = new String[]{name, parameters};
- }
- }
-
resultTimeKey = JsonPath.read(schema, "$.doc.timestamp.name");
}
/**
- * @return 解析schema获取的actions集合
- */
- public static HashMap<String, String[]> getActionMap() {
- return actionMap;
- }
-
- /**
* @return 解析schema获取的指标统计方式集合
*/
public static HashMap<String, String[]> getMetricFunctionsMap() {
@@ -264,13 +166,6 @@ public class JsonParseUtil {
}
/**
- * @return 解析schema获取的拆解函数
- */
- public static String[] getHierarchy() {
- return hierarchy;
- }
-
- /**
* @return 解析schema获取的时间字段的key
*/
public static String getResultTimeKey() {
@@ -281,7 +176,6 @@ public class JsonParseUtil {
* 在配置变化时清空缓存,重新解析schema更新缓存
*/
private static void clearCacheMap() {
- actionMap.clear();
metricFunctionsMap.clear();
dimensionsMap.clear();
filtersMap.clear();
diff --git a/src/test/java/com/zdjizhi/DatasketchesTest.java b/src/test/java/com/zdjizhi/DatasketchesTest.java
new file mode 100644
index 0000000..2b4b3a6
--- /dev/null
+++ b/src/test/java/com/zdjizhi/DatasketchesTest.java
@@ -0,0 +1,254 @@
+package com.zdjizhi;
+
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson2.*;
+import com.zdjizhi.utils.JsonMapper;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.hll.Union;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Test;
+
+import java.util.*;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2023/3/217:17
+ */
+public class DatasketchesTest {
+
+ @Test
+ public void HllSketchTest() {
+ HashSet<String> strings = new HashSet<>();
+
+ HllSketch sketch = new HllSketch(12);
+
+ for (int i = 0; i < 50; i++) {
+ String ip = "192.168.1." + i;
+ sketch.update(ip);
+ strings.add(ip);
+ }
+
+ System.out.println(sketch.getEstimate() + "--" + strings.size());
+
+ HashSet<String> randomStrings = new HashSet<>();
+
+ HllSketch randomSketch = new HllSketch(12);
+
+ for (int i = 0; i < 50; i++) {
+ String ip = makeIPv4Random();
+ randomSketch.update(ip);
+ randomStrings.add(ip);
+ }
+
+ System.out.println(randomSketch.getEstimate() + "--" + randomStrings.size());
+ }
+
+ @Test
+ public void HllSketchUnionTest() {
+ HashSet<String> strings = new HashSet<>();
+
+ HllSketch sketch = new HllSketch(12);
+
+ for (int i = 0; i < 50; i++) {
+ String ip = "192.168.1." + i;
+ sketch.update(ip);
+ strings.add(ip);
+ }
+
+ HllSketch sketch2 = new HllSketch(12);
+
+ for (int i = 0; i < 10; i++) {
+ String ip = "192.168.2." + i;
+ sketch2.update(ip);
+ strings.add(ip);
+ }
+
+ Union union = new Union(12);
+
+ union.update(sketch);
+ union.update(sketch2);
+ HllSketch sketch_result = HllSketch.heapify(union.getResult().toCompactByteArray());
+
+ System.out.println(sketch.getEstimate() + "--" + strings.size());
+ System.out.println(sketch2.getEstimate() + "--" + strings.size());
+ System.out.println(sketch_result.getEstimate() + "--" + strings.size());
+ }
+
+ @Test
+ public void HllSketchDruidTest() {
+ HashMap<String, Object> dataMap = new HashMap<>();
+
+ HashSet<String> strings = new HashSet<>();
+
+ HllSketch sketch = new HllSketch(12);
+
+ for (int i = 0; i < 50; i++) {
+ String ip = "192.168.1." + i;
+ sketch.update(ip);
+ strings.add(ip);
+ }
+
+ HllSketch sketch2 = new HllSketch(12);
+
+ for (int i = 0; i < 10; i++) {
+ String ip = "192.168.2." + i;
+ sketch2.update(ip);
+ strings.add(ip);
+ }
+
+ Union union = new Union(12);
+
+ union.update(sketch);
+ union.update(sketch2);
+ HllSketch sketch_result1 = HllSketch.heapify(union.getResult().toCompactByteArray());
+
+ HllSketch sketch3 = new HllSketch(12);
+
+ for (int i = 0; i < 10; i++) {
+ String ip = "192.168.3." + i;
+ sketch3.update(ip);
+ strings.add(ip);
+ }
+
+ Union union2 = new Union(12);
+
+ union2.update(sketch_result1);
+ union2.update(sketch3);
+ HllSketch sketch_result2 = HllSketch.heapify(union2.getResult().toCompactByteArray());
+
+ System.out.println(sketch.getEstimate() + "--" + strings.size());
+ System.out.println(sketch2.getEstimate() + "--" + strings.size());
+ System.out.println(sketch3.getEstimate() + "--" + strings.size());
+ System.out.println(sketch_result1.getEstimate() + "--" + strings.size());
+ System.out.println(sketch_result2.getEstimate() + "--" + strings.size());
+
+ Result result = new Result();
+ result.setC2s_pkt_num(10);
+ result.setS2c_pkt_num(10);
+ result.setC2s_byte_num(10);
+ result.setS2c_byte_num(10);
+ result.setStat_time(1679970031);
+ result.setSchema_type("HLLSketchMergeTest");
+
+ //CompactByte
+ result.setIp_object(sketch_result2.toCompactByteArray());
+// System.out.println(result.toString());
+ //sendMessage(JsonMapper.toJsonString(result);
+
+
+ //UpdatableByte
+ result.setIp_object(sketch_result2.toUpdatableByteArray());
+// System.out.println(result.toString());
+ //sendMessage(JsonMapper.toJsonString(result);
+
+ //Hashmap
+ dataMap.put("app_name", "TEST");
+ dataMap.put("protocol_stack_id", "HTTP");
+ dataMap.put("vsys_id", 1);
+ dataMap.put("stat_time", 1681370100);
+ dataMap.put("client_ip_sketch", sketch_result2.toUpdatableByteArray());
+
+ System.out.println("Jackson:" + JsonMapper.toJsonString(dataMap));
+ System.out.println("FastJson2:" + JSONObject.toJSONString(dataMap));
+ System.out.println("Hutool:" + JSONUtil.toJsonStr(dataMap) + "\n\n");
+
+ byte[] toJSONB = JSONB.toBytes(dataMap);
+// sendMessage(toJSONB);
+ JSONObject jsonObject = JSONB.parseObject(toJSONB);
+ System.out.println("FastJson2 Byte(JSONB):" + jsonObject.toJSONString() + "\n\n");
+
+
+ dataMap.put("client_ip_sketch", Base64.getEncoder().encodeToString(sketch_result2.toUpdatableByteArray()));
+ System.out.println("FastJson2 Byte(Base64):" + JSONObject.toJSONString(dataMap));
+ System.out.println("Hutool Byte(Base64):" + JSONObject.toJSONString(dataMap));
+ System.out.println(JSONUtil.toJsonStr(dataMap));
+
+
+// sendMessage(JSONObject.toJSONString(dataMap));
+ }
+
+
+ //随机生成ip
+ private static String makeIPv4Random() {
+ Random random = new Random();
+ int v4_1 = new Random().nextInt(255) + 1;
+ int v4_2 = new Random().nextInt(255);
+ int v4_3 = new Random().nextInt(255);
+ int v4_4 = new Random().nextInt(255);
+ return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4;
+ }
+
+ private static void sendMessage(Object message) {
+ Properties props = new Properties();
+ //kafka地址
+ props.put("bootstrap.servers", "192.168.44.12:9092");
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 67108864);
+// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<String, Object>(props);
+
+ kafkaProducer.send(new ProducerRecord<String, Object>("TRAFFIC-PROTOCOL-TEST", message));
+
+ kafkaProducer.close();
+ }
+}
+
+class Result {
+
+ private String schema_type;
+ private long c2s_byte_num;
+ private long c2s_pkt_num;
+ private long s2c_byte_num;
+ private long s2c_pkt_num;
+ private long stat_time;
+ private byte[] ip_object;
+
+ public void setSchema_type(String schema_type) {
+ this.schema_type = schema_type;
+ }
+
+ public void setC2s_byte_num(long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public void setC2s_pkt_num(long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public void setS2c_byte_num(long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public void setS2c_pkt_num(long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public void setStat_time(long stat_time) {
+ this.stat_time = stat_time;
+ }
+
+ public void setIp_object(byte[] ip_object) {
+ this.ip_object = ip_object;
+ }
+
+ @Override
+ public String toString() {
+ return "Result{" +
+ "schema_type='" + schema_type + '\'' +
+ ", c2s_byte_num=" + c2s_byte_num +
+ ", c2s_pkt_num=" + c2s_pkt_num +
+ ", s2c_byte_num=" + s2c_byte_num +
+ ", s2c_pkt_num=" + s2c_pkt_num +
+ ", stat_time=" + stat_time +
+ ", ip_object=" + Arrays.toString(ip_object) +
+ '}';
+ }
+} \ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java
new file mode 100644
index 0000000..847dd22
--- /dev/null
+++ b/src/test/java/com/zdjizhi/FunctionTest.java
@@ -0,0 +1,70 @@
+package com.zdjizhi;
+
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.StreamAggregateConfig;
+import com.zdjizhi.utils.StringUtil;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2023/1/617:54
+ */
+public class FunctionTest {
+ public static void main(String[] args) {
+ String groupKey = "ETHERNET.IPv4.TCP.UNCATEGORIZED.qq_r2@4";
+ String protocol = groupKey.substring(0, groupKey.indexOf("@"));
+ System.out.println(protocol);
+ StringBuffer stringBuffer = new StringBuffer();
+ String appName = "qq_r2";
+ String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER);
+ for (String proto : protocolIds) {
+ if (StringUtil.isBlank(stringBuffer.toString())) {
+ stringBuffer.append(proto);
+ System.out.println(stringBuffer.toString());
+ } else {
+ stringBuffer.append(".").append(proto);
+ if (proto.equals(appName)) {
+ System.out.println(stringBuffer.toString() + "---" + appName);
+ } else {
+ System.out.println(stringBuffer.toString());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void JsonPathTest() {
+ String json = "{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-7400\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-7400\"}]}";
+ String expr = "$.tags[?(@.tag=='data_center')].value";
+ Object read = JsonPath.parse(json).read(expr).toString();
+ System.out.println(read);
+ }
+
+ @Test
+ public void SplitTest() {
+ String str = "[.]";
+ String protocol = "ETHERNET.IPv4.TCP.http.test";
+
+ System.out.println(Arrays.toString(protocol.split(str)));
+
+ String str2 = "\\.";
+ System.out.println(Arrays.toString(protocol.split(str2)));
+
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < protocol.split(str).length - 1; i++) {
+ String value = protocol.split(str)[i];
+ if (StringUtil.isBlank(stringBuilder.toString())) {
+ stringBuilder.append(value);
+ System.out.println(stringBuilder.toString());
+ }else {
+ stringBuilder.append(".").append(value);
+ System.out.println(stringBuilder.toString());
+ }
+ }
+ }
+}
diff --git a/src/test/java/com/zdjizhi/FunctionsTest.java b/src/test/java/com/zdjizhi/FunctionsTest.java
deleted file mode 100644
index 6e3a20b..0000000
--- a/src/test/java/com/zdjizhi/FunctionsTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.zdjizhi;
-
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.json.JsonParseUtil;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2021/9/1714:22
- */
-public class FunctionsTest {
- private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap();
-
- @Test
- public void actionTest() {
- HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap();
- String[] metricNames = actionMap.getOrDefault("", actionMap.get("Default"));
- System.out.println(actionMap.toString());
- System.out.println(Arrays.toString(metricNames));
-
-
-
- }
-
-
-
-}
diff --git a/src/test/java/com/zdjizhi/JsonTest.java b/src/test/java/com/zdjizhi/JsonTest.java
new file mode 100644
index 0000000..dd0cf97
--- /dev/null
+++ b/src/test/java/com/zdjizhi/JsonTest.java
@@ -0,0 +1,145 @@
+package com.zdjizhi;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import net.sf.cglib.beans.BeanGenerator;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2023/4/1218:08
+ */
+public class JsonTest {
+
+// /**
+// * 在内存中加载反射类用的map
+// */
+// private static HashMap<String, Class> map = MetaDataParse.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+//
+// /**
+// * 反射成一个类
+// */
+// private static Object mapObject = MetaDataParse.generateObject(map);
+
+ @Test
+ public void fastJson2Test() {
+// SerializerFeature.DisableCircularReferenceDetect
+// SerializerFeature.WriteNullStringAsEmpty
+// SerializerFeature.WriteNullNumberAsZero
+ HashMap<String, Class> classHashMap = new HashMap<>();
+
+ String message = "{\"common_schema_type\":\"HTTP\",\"common_sessions\":1,\"http_request_line\":\"GET sampleFile.html HTTP/1.1\",\"http_host\":\"www.texaslotto.com\",\"http_url\":\"www.texaslotto.com/sampleFile.html\",\"http_user_agent\":\"xPTS/2.0\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_isn\":1953597368,\"http_proxy_flag\":0,\"http_version\":\"http1\",\"http_response_latency_ms\":1,\"http_session_duration_ms\":2,\"http_response_content_type\":\"text/html\",\"http_sequence\":80,\"common_protocol_label\":\"ETHERNET.IPv4.UDP.GTP.IPv4.TCP\",\"common_c2s_byte_diff\":17200,\"common_c2s_pkt_diff\":120,\"common_s2c_byte_diff\":16490,\"common_s2c_pkt_diff\":81,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_first_ttl\":64,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":0,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":0,\"common_s2c_byte_retrans\":0,\"common_flags\":24720,\"common_flags_identify_info\":\"{\\\"Server is Local\\\":1,\\\"Inbound\\\":201,\\\"C2S\\\":1,\\\"S2C\\\":2}\",\"common_direction\":73,\"common_app_full_path\":\"http\",\"common_app_label\":\"http\",\"common_tcp_client_isn\":1953597368,\"common_tcp_server_isn\":1950649408,\"common_server_ip\":\"192.50.199.25\",\"common_client_ip\":\"192.50.146.197\",\"common_server_port\":80,\"common_client_port\":22533,\"common_stream_dir\":3,\"common_address_type\":4,\"common_address_list\":\"IPv4_TCP<22533-80-192.50.146.197-192.50.199.25>|GTP<111001144-851056526>|IPv4_UDP<2152-2152-192.50.235.220-192.50.135.83>|MAC<000c299b2fa4-000c2915b4f4>\",\"common_start_time\":1680475247,\"common_end_time\":1680475247,\"common_con_duration_ms\":23,\"common_s2c_pkt_num\":81,\"common_s2c_byte_num\":16490,\"common_c2s_pkt_num\":120,\"common_c2s_byte_num\":17200,\"common_establish_latency_ms\":2,\"common_client_location\":\"日本.Unknown.Unknown\",\"common_server_location\":\"日本.Unknown.Unknown\",\"common_service_category\":[6223,6219,5093,5089],\"common_apn\":\"cmiott.owflr.mcto60g.com\",\"common_imsi\":\"460045157091460\",\"common_imei\":\"8626070583005833\",\"common_phone_number\":\"861440152028973\",\"common_tunnel_endpoint_a_desc\":\"test_50_gtp\",\"common_tunnel_endpoint_b_desc\":\"test_50_gtp\",\"common_tunnels\":[{\"tunnels_schema_type\":\"GTP\",\"gtp_a2b_teid\":111001144,\"gtp_b2a_teid\":851056526,\"gtp_endpoint_a_ip\":\"192.50.235.220\",\"gtp_endpoint_b_ip\":\"192.50.135.83\",\"gtp_endpoint_a_port\":2152,\"gtp_endpoint_b_port\":2152},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0c:29:9b:2f:a4\",\"c2s_destination_mac\":\"00:0c:29:15:b4:f4\",\"s2c_source_mac\":\"00:0c:29:15:b4:f4\",\"s2c_destination_mac\":\"00:0c:29:9b:2f:a4\"}],\"common_stream_trace_id\":\"578829229323951427\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.161\",\"common_device_id\":\"unknown\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-7400\\\"},{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-7400\\\"}]}\",\"common_t_vsys_id\":1,\"common_policy_id\":0,\"common_service\":2,\"common_action\":0,\"common_vsys_id\":1}";
+
+ JSONObject json = JSON.parseObject(message);
+ Object mapObject = generateObject(classHashMap);
+ Object object = JSON.parseObject(message, mapObject.getClass());
+
+
+ System.out.println(json.get("common_schema_type"));
+ json.put("common_schema_type", "SSH");
+
+
+ System.out.println(json.toJSONString());
+
+
+ }
+
+ private static Class getClassName(String type) {
+ Class clazz;
+
+ switch (type) {
+ case "int":
+ clazz = Integer.class;
+ break;
+ case "string":
+ clazz = String.class;
+ break;
+ case "long":
+ clazz = long.class;
+ break;
+ case "array":
+ clazz = List.class;
+ break;
+ case "double":
+ clazz = double.class;
+ break;
+ case "float":
+ clazz = float.class;
+ break;
+ case "char":
+ clazz = char.class;
+ break;
+ case "byte":
+ clazz = byte.class;
+ break;
+ case "boolean":
+ clazz = boolean.class;
+ break;
+ case "short":
+ clazz = short.class;
+ break;
+ default:
+ clazz = String.class;
+ }
+ return clazz;
+ }
+
+ /**
+ * 根据反射生成对象的方法
+ *
+ * @param properties 反射类用的map
+ * @return 生成的Object类型的对象
+ */
+ private static Object generateObject(Map properties) {
+ BeanGenerator generator = new BeanGenerator();
+ Set keySet = properties.keySet();
+ for (Object aKeySet : keySet) {
+ String key = (String) aKeySet;
+ generator.addProperty(key, (Class) properties.get(key));
+ }
+ return generator.create();
+ }
+
+// /**
+// * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
+// *
+// * @param http 网关schema地址
+// * @return 用于反射生成schema类型的对象的一个map集合
+// */
+// public static HashMap<String, Class> getMapFromHttp(String schema) {
+// HashMap<String, Class> map = new HashMap<>(16);
+//
+// DocumentContext parse = JsonPath.parse(schema);
+//
+// //获取fields,并转化为数组,数组的每个元素都是一个name doc type
+// com.alibaba.fastjson.JSONObject schemaJson = com.alibaba.fastjson.JSON.parseObject(data.toString());
+// JSONArray fields = (JSONArray) schemaJson.get("fields");
+//
+// for (Object field : fields) {
+// String filedStr = field.toString();
+// if (checkKeepField(filedStr)) {
+// String name = JsonPath.read(filedStr, "$.name").toString();
+// String type = JsonPath.read(filedStr, "$.type").toString();
+// if (type.contains("{")) {
+// type = JsonPath.read(filedStr, "$.type.type").toString();
+// }
+// //组合用来生成实体类的map
+// map.put(name, getClassName(type));
+// } else {
+// dropList.add(filedStr);
+// }
+// }
+// return map;
+// }
+}