summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2021-09-27 11:13:25 +0800
committerqidaijie <[email protected]>2021-09-27 11:13:25 +0800
commit99bf45cdbc2186d37b9a3876bb3b4ab2b2620c3f (patch)
tree14342d6cd2ce86b73b51b8dcfe88e0c1571d9b86
parentdd31a0bf9612f93f8d1350f50b9f48456fb94c9a (diff)
提交2109版livecharts
-rw-r--r--pom.xml240
-rw-r--r--properties/default_config.properties29
-rw-r--r--properties/service_flow_config.properties51
-rw-r--r--src/main/java/com/zdjizhi/common/StreamAggregateConfig.java56
-rw-r--r--src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java56
-rw-r--r--src/main/java/com/zdjizhi/utils/exception/AnalysisException.java18
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java105
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java17
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java19
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java122
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java22
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java43
-rw-r--r--src/main/java/com/zdjizhi/utils/general/MetricFunctions.java37
-rw-r--r--src/main/java/com/zdjizhi/utils/general/ParseFunctions.java97
-rw-r--r--src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java77
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java357
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java142
-rw-r--r--src/main/java/com/zdjizhi/utils/json/TypeUtils.java180
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java36
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/Consumer.java42
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/Producer.java53
-rw-r--r--src/main/java/com/zdjizhi/utils/system/StreamAggregateConfigurations.java70
-rw-r--r--src/main/java/log4j.properties25
-rw-r--r--src/test/java/com/zdjizhi/FunctionsTest.java33
24 files changed, 1927 insertions, 0 deletions
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f437b34
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,240 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>log-olap-analysis-schema</artifactId>
+ <version>210908-security</version>
+
+ <name>log-olap-analysis-schema</name>
+ <url>http://www.example.com</url>
+
+
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public</url>
+ </repository>
+
+ <repository>
+ <id>maven-ali</id>
+ <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
+ <releases>
+ <!--<enabled>true</enabled>-->
+ </releases>
+ <snapshots>
+ <!--<enabled>true</enabled>-->
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flink.version>1.13.1</flink.version>
+ <hadoop.version>2.7.1</hadoop.version>
+ <kafka.version>1.0.0</kafka.version>
+ <hbase.version>2.2.3</hbase.version>
+ <scope.type>provided</scope.type>
+ <!--<scope.type>compile</scope.type>-->
+ </properties>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>com.zdjizhi.topology.StreamAggregateTopology</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>io.github.zlika</groupId>
+ <artifactId>reproducible-build-maven-plugin</artifactId>
+ <version>0.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>strip-jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ <include>**/*.xml</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+
+ <resource>
+ <directory>src\main\java</directory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>galaxy</artifactId>
+ <version>1.0.6</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
+ <!--<dependency>-->
+ <!--<groupId>org.apache.flink</groupId>-->
+ <!--<artifactId>flink-table</artifactId>-->
+ <!--<version>${flink.version}</version>-->
+ <!--<type>pom</type>-->
+ <!--<scope>${scope.type}</scope>-->
+ <!--</dependency>-->
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_2.12</artifactId>
+ <version>${flink.version}</version>
+ <!--<scope>${scope.type}</scope>-->
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>cglib</groupId>
+ <artifactId>cglib-nodep</artifactId>
+ <version>3.2.4</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>5.3.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <version>2.4.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_pushgateway</artifactId>
+ <version>0.9.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>cn.hutool</groupId>
+ <artifactId>hutool-all</artifactId>
+ <version>5.5.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.21</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.21</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
+
diff --git a/properties/default_config.properties b/properties/default_config.properties
new file mode 100644
index 0000000..2b9bfb1
--- /dev/null
+++ b/properties/default_config.properties
@@ -0,0 +1,29 @@
+#producer���ԵĴ�������
+retries=0
+
+#���ĺ������˵һ��Batch������֮��������ã��������Batch��û��д���������뷢�ͳ�ȥ��
+linger.ms=5
+
+#����ڳ�ʱ֮ǰδ�յ���Ӧ���ͻ��˽��ڱ�Ҫʱ���·�������
+request.timeout.ms=30000
+
+#producer���ǰ���batch���з��͵�,���δ�С��Ĭ��:16384
+batch.size=262144
+
+#Producer�����ڻ�����Ϣ�Ļ�������С
+buffer.memory=67108864
+
+#�������������ÿ�η��͸�Kafka���������������С,Ĭ��1048576
+max.request.size=5242880
+
+#kafka SASL��֤�û���
+kafka.user=admin
+
+#kafka SASL��SSL��֤����
+kafka.pin=galaxy2019
+
+#kafka source protocol; SSL or SASL
+kafka.source.protocol=SASL
+
+#kafka sink protocol; SSL or SASL
+kafka.sink.protocol=SASL \ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
new file mode 100644
index 0000000..91f018d
--- /dev/null
+++ b/properties/service_flow_config.properties
@@ -0,0 +1,51 @@
+#--------------------------------地址配置------------------------------#
+
+#管理kafka地址
+input.kafka.servers=192.168.44.12:9094
+
+#管理输出kafka地址
+output.kafka.servers=192.168.44.12:9094
+
+#--------------------------------HTTP------------------------------#
+#kafka 证书地址
+tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
+
+#网关的schema位置
+#schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart_interim
+schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart_session
+
+#网关APP_ID 获取接口
+app.id.http=http://192.168.44.67:9999/open-api/appDicList
+
+#--------------------------------Kafka消费组信息------------------------------#
+
+#kafka 接收数据topic
+input.kafka.topic=test
+#input.kafka.topic=SESSION-RECORD
+#input.kafka.topic=INTERIM-SESSION-RECORD
+
+#补全数据 输出 topic
+output.kafka.topic=test-result
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=liveCharts-session-test-20210811-1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=none
+
+#生产者ack
+producer.ack=1
+
+#--------------------------------topology配置------------------------------#
+
+#consumer 并行度
+consumer.parallelism=1
+
+#map函数并行度
+parse.parallelism=1
+
+#app_id 更新时间,如填写0则不更新缓存
+app.tick.tuple.freq.secs=0
+
+#聚合窗口时间
+count.window.time=15 \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
new file mode 100644
index 0000000..3647c5a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java
@@ -0,0 +1,56 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.utils.system.StreamAggregateConfigurations;
+
+/**
+ * @author Administrator
+ */
+public class StreamAggregateConfig {
+
+ public static final String FORMAT_SPLITTER = ",";
+ public static final String PROTOCOL_SPLITTER = "\\.";
+
+ /**
+ * System
+ */
+ public static final Integer CONSUMER_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "consumer.parallelism");
+ public static final Integer PARSE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.parallelism");
+ public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
+ public static final Integer COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "count.window.time");
+ public static final String TOOLS_LIBRARY = StreamAggregateConfigurations.getStringProperty(0, "tools.library");
+
+ /**
+ * kafka source
+ */
+ public static final String INPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.servers");
+ public static final String OUTPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.servers");
+ public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id");
+ public static final String OUTPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.topic");
+ public static final String INPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.topic");
+ public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack");
+ public static final String KAFKA_SOURCE_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.source.protocol");
+ public static final String KAFKA_SINK_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.sink.protocol");
+ public static final String KAFKA_USER = StreamAggregateConfigurations.getStringProperty(1, "kafka.user");
+ public static final String KAFKA_PIN = StreamAggregateConfigurations.getStringProperty(1, "kafka.pin");
+ public static final String RETRIES = StreamAggregateConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = StreamAggregateConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = StreamAggregateConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = StreamAggregateConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = StreamAggregateConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = StreamAggregateConfigurations.getIntProperty(1, "max.request.size");
+
+
+ /**
+ * kafka限流配置-20201117
+ */
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+
+ /**
+ * http
+ */
+ public static final String SCHEMA_HTTP = StreamAggregateConfigurations.getStringProperty(0, "schema.http");
+ public static final String APP_ID_HTTP = StreamAggregateConfigurations.getStringProperty(0, "app.id.http");
+
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
new file mode 100644
index 0000000..301d862
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java
@@ -0,0 +1,56 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.StreamAggregateConfig;
+import com.zdjizhi.utils.functions.*;
+import com.zdjizhi.utils.kafka.Consumer;
+import com.zdjizhi.utils.kafka.Producer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.WindowedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2021/5/2016:42
+ */
+public class StreamAggregateTopology {
+ private static final Log logger = LogFactory.get();
+
+ public static void main(String[] args) {
+ try {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ environment.enableCheckpointing(5000);
+
+ DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
+ .setParallelism(StreamAggregateConfig.CONSUMER_PARALLELISM);
+
+ SingleOutputStreamOperator<Tuple3<String, String, String>> parseDataMap = streamSource.map(new MapParseFunction()).name("ParseDataMap")
+ .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
+
+ WindowedStream<Tuple3<String, String, String>, String, TimeWindow> window = parseDataMap.keyBy(new KeyByFunction())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.COUNT_WINDOW_TIME)));
+
+ SingleOutputStreamOperator<String> metricCountWindow = window.process(new CountWindowFunction()).name("MetricCountWindow")
+ .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
+
+ metricCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM)
+ .addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM);
+
+ environment.execute(args[0]);
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is :" + e);
+ }
+
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java b/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java
new file mode 100644
index 0000000..ad251a2
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java
@@ -0,0 +1,18 @@
+package com.zdjizhi.utils.exception;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.storm.utils.execption
+ * @Description:
+ * @date 2021/3/259:42
+ */
+public class AnalysisException extends RuntimeException {
+
+ public AnalysisException() {
+ }
+
+ public AnalysisException(String message) {
+ super(message);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java
new file mode 100644
index 0000000..5f22a6b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java
@@ -0,0 +1,105 @@
+package com.zdjizhi.utils.functions;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.general.MetricFunctions;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2113:55
+ */
+public class CountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, String, String, TimeWindow> {
+ private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class);
+
+ private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap();
+ private static HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap();
+ private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(32);
+ private static String resultTimeKey = JsonParseUtil.getTimeKey();
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void process(String key, Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) {
+ try {
+ for (Tuple3<String, String, String> tuple : input) {
+ String label = tuple.f0;
+ //action中某个协议的所有function,如果没有就默认
+ String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default"));
+ String dimensions = tuple.f1;
+ String message = tuple.f2;
+ if (StringUtil.isNotBlank(message)){
+ Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class);
+ Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
+
+ Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj);
+ for (String name : metricNames) {
+ String[] metrics = metricsMap.get(name);
+ String function = metrics[0];
+ String fieldName = metrics[1];
+ functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, fieldName));
+
+ }
+ cacheMap.put(dimensions, cacheMessage);
+ }
+ }
+
+ if (!cacheMap.isEmpty()) {
+ Long endTime = context.window().getEnd() / 1000;
+
+ for (String countKey : cacheMap.keySet()) {
+ Map<String, Object> resultMap = cacheMap.get(countKey);
+ JsonParseUtil.setValue(resultMap, resultTimeKey, endTime);
+ output.collect(JsonMapper.toJsonString(resultMap));
+ }
+// cacheMap.clear();
+ }
+
+ } catch (RuntimeException e) {
+ logger.error("windows count error,message:" + e);
+ e.printStackTrace();
+ } finally {
+ cacheMap.clear();
+ }
+ }
+
+ /**
+ * 根据schema描述对应字段进行操作的 函数集合
+ *
+ * @param function 函数名称
+ * @param cacheMessage 结果集
+ * @param nameValue 当前值
+ * @param fieldNameValue 新加值
+ */
+ private static void functionSet(String function, Map<String, Object> cacheMessage, String resultName, Object nameValue, Object fieldNameValue) {
+ switch (function) {
+ case "sum":
+ cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue));
+ break;
+ case "count":
+ cacheMessage.put(resultName, MetricFunctions.count(nameValue));
+ break;
+ case "unique_sip_num":
+ //TODO
+ break;
+ case "unique_cip_num":
+ //TODO
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
new file mode 100644
index 0000000..de507ad
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
@@ -0,0 +1,17 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class FilterNullFunction implements FilterFunction<String> {
+ @Override
+ public boolean filter(String message) {
+ return StringUtil.isNotBlank(message);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
new file mode 100644
index 0000000..0b00b3c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java
@@ -0,0 +1,19 @@
+package com.zdjizhi.utils.functions;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2112:13
+ */
+public class KeyByFunction implements KeySelector<Tuple3<String, String, String>, String> {
+
+ @Override
+ public String getKey(Tuple3<String, String, String> value) throws Exception {
+ //以map拼接的key分组
+ return value.f1;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java
new file mode 100644
index 0000000..41a6109
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java
@@ -0,0 +1,122 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.common.StreamAggregateConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.general.ParseFunctions;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class MapParseFunction implements MapFunction<String, Tuple3<String, String, String>> {
+ private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class);
+
+ private static ArrayList<String[]> jobList = JsonParseUtil.getTransformsList();
+
+ private static HashMap<String, String> dimensionsMap = JsonParseUtil.getDimensionsMap();
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Tuple3<String, String, String> map(String message) {
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
+ Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object);
+ if (ParseFunctions.filterLogs(object)) {
+ for (String[] strings : jobList) {
+ //函数名称
+ String function = strings[0];
+ //需要补全的字段的key
+ String resultKeyName = strings[1];
+ //原始日志字段key
+ String logsKeyName = strings[2];
+ //原始日志字段对应的值
+ Object logsKeyValue = JsonParseUtil.getValue(object, strings[2]);
+ //额外的参数的值
+ String parameters = strings[3];
+
+ switch (function) {
+ case "dismantling":
+ if (StringUtil.isNotBlank(parameters)) {
+ if (logsKeyValue != null) {
+ JsonParseUtil.setValue(message, logsKeyName, dismantlingUtils(parameters, logsKeyValue));
+ }
+ }
+ break;
+ case "combination":
+ if (StringUtil.isNotBlank(parameters)) {
+ if (logsKeyValue != null) {
+ combinationUtils(dimensionsObj, object, parameters, resultKeyName, logsKeyName);
+ }
+ }
+ break;
+ case "hierarchy":
+// collector.emit(new Values(JsonParseUtil.getValue(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)));
+ return new Tuple3<>(JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object));
+ default:
+ break;
+ }
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("Map Parse error,message:" + e);
+ return new Tuple3<>("", "", "");
+ }
+ return new Tuple3<>("", "", "");
+ }
+
+ /**
+ * alignment ID替换操作
+ * 根据缓存中的AppId对应信息,获取当前AppId对应的具体名称。
+ *
+ * @param parameters 参数集
+ * @param fieldName 原始日志列名
+ */
+ private static String dismantlingUtils(String parameters, Object fieldName) {
+ String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
+ int digits = Integer.parseInt(alignmentPars[0]);
+ return fieldName.toString().split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits];
+ }
+
+ /**
+ * combination 拼接操作
+ * 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符
+ *
+ * @param parameters 参数集
+ * @param message 原始日志
+ * @param fieldName 原始日志列名
+ */
+ private static void combinationUtils(Map<String, Object> dimensions, Map<String, Object> message, String parameters, String resultKeyName, String fieldName) {
+ String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
+ Object combinationField = JsonParseUtil.getValue(message, combinationPars[0]);
+ if (combinationField != null) {
+ String separator = combinationPars[1];
+ Object fieldNameValue = JsonParseUtil.getValue(message, fieldName);
+ if (fieldNameValue != null) {
+ String combinationValue = fieldNameValue + separator + combinationField;
+ dimensions.put(resultKeyName, combinationValue);
+ JsonParseUtil.setValue(message, fieldName, combinationValue);
+ } else {
+ dimensions.put(resultKeyName, combinationField);
+ JsonParseUtil.setValue(message, fieldName, combinationField);
+
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java b/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java
new file mode 100644
index 0000000..4ba139f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java
@@ -0,0 +1,22 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2117:32
+ */
+public class MyTimeAssigner implements SerializableTimestampAssigner<String> {
+ @Override
+ public long extractTimestamp(String element, long recordTimestamp) {
+ Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(element, Map.class);
+
+ return JsonParseUtil.getLong(object,"common_end_time");
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
new file mode 100644
index 0000000..d458984
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java
@@ -0,0 +1,43 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.common.StreamAggregateConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/7/2114:52
+ */
+public class ResultFlatMapFunction implements FlatMapFunction<String, String> {
+ private static String[] jobList = JsonParseUtil.getHierarchy();
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void flatMap(String value, Collector out) throws Exception {
+ StringBuffer stringBuffer = new StringBuffer();
+ String name = jobList[0];
+ Map<String, Object> jsonObject = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
+ String protocol = JsonParseUtil.getString(jsonObject, name);
+ if (StringUtil.isNotBlank(protocol)) {
+ String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER);
+ for (String proto : protocolIds) {
+ if (StringUtil.isBlank(stringBuffer.toString())) {
+ stringBuffer.append(proto);
+ jsonObject.put(name, stringBuffer.toString());
+ out.collect(JsonMapper.toJsonString(jsonObject));
+ } else {
+ stringBuffer.append(jobList[1]).append(proto);
+ jsonObject.put(name, stringBuffer.toString());
+ out.collect(JsonMapper.toJsonString(jsonObject));
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java
new file mode 100644
index 0000000..5417236
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java
@@ -0,0 +1,37 @@
+package com.zdjizhi.utils.general;
+
+
+import com.zdjizhi.utils.json.JsonTypeUtils;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.storm.utils.general
+ * @Description:
+ * @date 2021/7/2015:31
+ */
+public class MetricFunctions {
+ /**
+ * Long类型的数据求和
+ *
+ * @param value1 第一个值
+ * @param value2 第二个值
+ * @return value1 + value2
+ */
+ public static Long longSum(Object value1, Object value2) {
+ Long res1 = JsonTypeUtils.checkLongValue(value1);
+ Long res2 = JsonTypeUtils.checkLongValue(value2);
+
+ return res1 + res2;
+ }
+
+ /**
+ * 计算Count
+ *
+ * @param count 当前count值
+ * @return count+1
+ */
+ public static Long count(Object count) {
+
+ return JsonTypeUtils.checkLongValue(count) + 1L;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
new file mode 100644
index 0000000..5ab46e6
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java
@@ -0,0 +1,97 @@
+package com.zdjizhi.utils.general;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @ClassNameAggregateUtils
+ * @Author [email protected]
+ * @Date2020/6/23 14:04
+ * @Version V1.0
+ **/
+public class ParseFunctions {
+ /**
+ * 获取filters条件map
+ */
+ private static HashMap<String, String> filtersMap = JsonParseUtil.getFiltersMap();
+
+
+ /**
+ * 解析 dimensions 字段集
+ *
+ * @param dimensions 维度集
+ * @param message 原始日志
+ * @return 结果维度集
+ */
+ public static Map<String, Object> transDimensions(Map<String, String> dimensions, Map<String, Object> message) {
+ HashMap<String, Object> dimensionsObj = new HashMap<>(16);
+
+ for (String dimension : dimensions.keySet()) {
+ dimensionsObj.put(dimension, JsonParseUtil.getValue(message, dimensions.get(dimension)));
+ }
+
+ return dimensionsObj;
+ }
+
+ /**
+ * 构建filters过滤函数,根据Schema指定的函数对日志进行过滤
+ *
+ * @param object 原始日志
+ * @return true or false
+ */
+ public static boolean filterLogs(Map<String, Object> object) {
+ boolean available = false;
+
+ for (String key : filtersMap.keySet()) {
+ switch (key) {
+ case "notempty":
+ Object value = JsonParseUtil.getValue(object, filtersMap.get(key));
+ if (value != null && StringUtil.isNotBlank(value.toString())) {
+ available = true;
+ }
+ break;
+ default:
+ }
+ }
+ return available;
+ }
+
+// /**
+// * 更新缓存中的对应关系map
+// *
+// * @param hashMap 当前缓存对应关系map
+// */
+// public static void updateAppRelation(HashMap<Integer, String> hashMap) {
+// try {
+// Long begin = System.currentTimeMillis();
+// String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
+// if (StringUtil.isNotBlank(schema)) {
+// String data = JSONObject.parseObject(schema).getString("data");
+// JSONArray objects = JSONArray.parseArray(data);
+// for (Object object : objects) {
+// JSONArray jsonArray = JSONArray.parseArray(object.toString());
+// int key = jsonArray.getInteger(0);
+// String value = jsonArray.getString(1);
+// if (hashMap.containsKey(key)) {
+// if (!value.equals(hashMap.get(key))) {
+// hashMap.put(key, value);
+// }
+// } else {
+// hashMap.put(key, value);
+// }
+// }
+// logger.warn("更新缓存对应关系用时:" + (begin - System.currentTimeMillis()));
+// logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size());
+// }
+// } catch (RuntimeException e) {
+// logger.error("更新缓存APP-ID失败,异常:" + e);
+// }
+// }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
new file mode 100644
index 0000000..1adb1d1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
@@ -0,0 +1,77 @@
+package com.zdjizhi.utils.http;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * 获取网关schema的工具类
+ *
+ * @author qidaijie
+ */
+public class HttpClientUtil {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 请求网关获取schema
+ *
+ * @param http 网关url
+ * @return schema
+ */
+ public static String requestByGetMethod(String http) {
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ StringBuilder entityStringBuilder;
+
+ HttpGet get = new HttpGet(http);
+ BufferedReader bufferedReader = null;
+ CloseableHttpResponse httpResponse = null;
+ try {
+ httpResponse = httpClient.execute(get);
+ HttpEntity entity = httpResponse.getEntity();
+ entityStringBuilder = new StringBuilder();
+ if (null != entity) {
+ bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
+ int intC;
+ while ((intC = bufferedReader.read()) != -1) {
+ char c = (char) intC;
+ if (c == '\n') {
+ break;
+ }
+ entityStringBuilder.append(c);
+ }
+
+ return entityStringBuilder.toString();
+ }
+ } catch (IOException e) {
+ logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
+ } finally {
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (httpResponse != null) {
+ try {
+ httpResponse.close();
+ } catch (IOException e) {
+ logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (bufferedReader != null) {
+ IOUtils.closeQuietly(bufferedReader);
+ }
+ }
+ return "";
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
new file mode 100644
index 0000000..4a6a01c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -0,0 +1,357 @@
+package com.zdjizhi.utils.json;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.StreamAggregateConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.http.HttpClientUtil;
+import net.sf.cglib.beans.BeanGenerator;
+import net.sf.cglib.beans.BeanMap;
+
+import java.util.*;
+
+/**
+ * 使用FastJson解析json的工具类
+ *
+ * @author qidaijie
+ */
+public class JsonParseUtil {
+
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 模式匹配,给定一个类型字符串返回一个类类型
+ *
+ * @param type 类型
+ * @return 类类型
+ */
+
+ public static Class getClassName(String type) {
+ Class clazz;
+
+ switch (type) {
+ case "int":
+ clazz = Integer.class;
+ break;
+ case "string":
+ clazz = String.class;
+ break;
+ case "long":
+ clazz = long.class;
+ break;
+ case "array":
+ clazz = List.class;
+ break;
+ case "double":
+ clazz = double.class;
+ break;
+ case "float":
+ clazz = float.class;
+ break;
+ case "char":
+ clazz = char.class;
+ break;
+ case "byte":
+ clazz = byte.class;
+ break;
+ case "boolean":
+ clazz = boolean.class;
+ break;
+ case "short":
+ clazz = short.class;
+ break;
+ default:
+ clazz = String.class;
+ }
+ return clazz;
+ }
+
+
+ /**
+ * 获取属性值的方法
+ *
+ * @param obj 对象
+ * @param property key
+ * @return 属性的值
+ */
+ public static Object getValue(Object obj, String property) {
+
+ try {
+ BeanMap beanMap = BeanMap.create(obj);
+ if (beanMap.containsKey(property)) {
+ return beanMap.get(property);
+ } else {
+ return null;
+ }
+ } catch (RuntimeException e) {
+ logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e);
+ return null;
+ }
+ }
+
+ /**
+ * 获取属性值的方法
+ *
+ * @param jsonMap 原始日志
+ * @param property key
+ * @return 属性的值
+ */
+ public static Object getValue(Map<String, Object> jsonMap, String property) {
+ try {
+ return jsonMap.getOrDefault(property, null);
+ } catch (RuntimeException e) {
+ logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e);
+ return null;
+ }
+ }
+
+ /**
+ * long 类型检验转换方法,若为空返回基础值
+ *
+ * @return Long value
+ */
+ public static Long getLong(Map<String, Object> jsonMap, String property) {
+ Object value = jsonMap.getOrDefault(property, null);
+ Long longVal = TypeUtils.castToLong(value);
+
+ if (longVal == null) {
+ return 0L;
+ }
+
+ return longVal;
+ }
+
+ public static String getString(Map<String, Object> jsonMap, String property) {
+ Object value = jsonMap.getOrDefault(property, null);
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ if (value instanceof List) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * 更新属性值的方法
+ *
+ * @param jsonMap 原始日志json map
+ * @param property 更新的key
+ * @param value 更新的值
+ */
+ public static void setValue(Map<String, Object> jsonMap, String property, Object value) {
+ try {
+ jsonMap.put(property, value);
+ } catch (RuntimeException e) {
+ logger.error("赋予实体类错误类型数据", e);
+ }
+ }
+
+ /**
+ * 更新属性值的方法
+ *
+ * @param obj 对象
+ * @param property 更新的key
+ * @param value 更新的值
+ */
+ public static void setValue(Object obj, String property, Object value) {
+ try {
+ BeanMap beanMap = BeanMap.create(obj);
+ beanMap.put(property, value);
+ } catch (ClassCastException e) {
+ logger.error("赋予实体类错误类型数据", e);
+ }
+ }
+
+ /**
+ * 根据反射生成对象的方法
+ *
+ * @param properties 反射类用的map
+ * @return 生成的Object类型的对象
+ */
+ public static Object generateObject(Map properties) {
+ BeanGenerator generator = new BeanGenerator();
+ Set keySet = properties.keySet();
+ for (Object aKeySet : keySet) {
+ String key = (String) aKeySet;
+ generator.addProperty(key, (Class) properties.get(key));
+ }
+ return generator.create();
+ }
+
+ /**
+ * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
+ *
+ * @return 用于反射生成schema类型的对象的一个map集合
+ */
+ public static HashMap<String, String[]> getActionMap() {
+ HashMap<String, String[]> map = new HashMap<>(16);
+
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ DocumentContext parse = JsonPath.parse(schema);
+
+ List<Object> actions = parse.read("$.data.doc.action[*]");
+
+ for (Object action : actions) {
+ map.put(JsonPath.read(action, "$.label"),
+ JsonPath.read(action, "$.metrics").toString().split(StreamAggregateConfig.FORMAT_SPLITTER));
+// System.out.println(JsonPath.read(action, "$.label")+JsonPath.read(action, "$.metrics").toString());
+ }
+
+ return map;
+ }
+
+ /**
+ * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
+ *
+ * @return 用于反射生成schema类型的对象的一个map集合
+ */
+ public static HashMap<String, String[]> getMetricsMap() {
+ HashMap<String, String[]> map = new HashMap<>(16);
+
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ DocumentContext parse = JsonPath.parse(schema);
+
+ List<Object> metrics = parse.read("$.data.doc.metrics[*]");
+
+ for (Object metric : metrics) {
+ map.put(JsonPath.read(metric, "$.name"),
+ new String[]{JsonPath.read(metric, "$.function"), JsonPath.read(metric, "$.fieldName")}
+ );
+ }
+ return map;
+ }
+
+ /**
+ * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
+ *
+ * @return 用于反射生成schema类型的对象的一个map集合
+ */
+ public static String getTimeKey() {
+
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+
+ return JsonPath.read(schema, "$.data.doc.timestamp.name");
+ }
+
+
+ /**
+ * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
+ *
+ * @return 用于反射生成schema类型的对象的一个map集合
+ */
+ public static HashMap<String, Class> getResultLogMap() {
+ HashMap<String, Class> map = new HashMap<>(16);
+
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ DocumentContext parse = JsonPath.parse(schema);
+
+ List<Object> dimensions = parse.read("$.data.doc.dimensions[*]");
+
+ for (Object dimension : dimensions) {
+ map.put(JsonPath.read(dimension, "$.name"),
+ JsonParseUtil.getClassName(JsonPath.read(dimension, "$.type")));
+ }
+
+ List<Object> metrics = parse.read("$.data.doc.metrics[*]");
+ for (Object metric : metrics) {
+ map.put(JsonPath.read(metric, "$.name"),
+ JsonParseUtil.getClassName(JsonPath.read(metric, "$.type")));
+ }
+
+ return map;
+ }
+
+ /**
+ * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
+ *
+ * @return 用于反射生成schema类型的对象的一个map集合
+ */
+ public static HashMap<String, String> getDimensionsMap() {
+ HashMap<String, String> map = new HashMap<>(16);
+
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ DocumentContext parse = JsonPath.parse(schema);
+
+ List<Object> dimensions = parse.read("$.data.doc.dimensions[*]");
+
+ for (Object dimension : dimensions) {
+ map.put(JsonPath.read(dimension, "$.name"),
+ JsonPath.read(dimension, "$.fieldName"));
+ }
+
+ return map;
+ }
+
+
+ /**
+ * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
+ *
+ * @return 用于反射生成schema类型的对象的一个map集合
+ */
+ public static HashMap<String, String> getFiltersMap() {
+ HashMap<String, String> map = new HashMap<>(16);
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ DocumentContext parse = JsonPath.parse(schema);
+
+ List<Object> filters = parse.read("$.data.doc.filters[*]");
+ for (Object filter : filters) {
+ map.put(JsonPath.read(filter, "$.type"), JsonPath.read(filter, "$.fieldName"));
+ }
+
+ return map;
+ }
+
+
+ /**
+ * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
+ *
+ * @return 任务列表
+ */
+ public static ArrayList<String[]> getTransformsList() {
+ ArrayList<String[]> list = new ArrayList<>();
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ DocumentContext parse = JsonPath.parse(schema);
+
+ List<Object> transforms = parse.read("$.data.doc.transforms[*]");
+ for (Object transform : transforms) {
+ String function = JsonPath.read(transform, "$.function").toString();
+ String name = JsonPath.read(transform, "$.name").toString();
+ String fieldName = JsonPath.read(transform, "$.fieldName").toString();
+ String parameters = JsonPath.read(transform, "$.parameters").toString();
+ list.add(new String[]{function, name, fieldName, parameters});
+ }
+
+ return list;
+ }
+
+ /**
+ * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
+ *
+ * @return 任务列表
+ */
+ public static String[] getHierarchy() {
+ String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
+ DocumentContext parse = JsonPath.parse(schema);
+ List<Object> transforms = parse.read("$.data.doc.transforms[*]");
+ for (Object transform : transforms) {
+ String function = JsonPath.read(transform, "$.function").toString();
+ if ("hierarchy".equals(function)) {
+ String name = JsonPath.read(transform, "$.name").toString();
+ String parameters = JsonPath.read(transform, "$.parameters").toString();
+ return new String[]{name, parameters};
+ }
+ }
+ return null;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java
new file mode 100644
index 0000000..034f76a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java
@@ -0,0 +1,142 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.exception.AnalysisException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package PACKAGE_NAME
+ * @Description:
+ * @date 2021/7/1217:34
+ */
+public class JsonTypeUtils {
+ private static final Log logger = LogFactory.get();
+ /**
+ * String 类型检验转换方法
+ *
+ * @param value json value
+ * @return String value
+ */
+ public static String checkString(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ if (value instanceof List) {
+ return JsonMapper.toJsonString(value);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * array 类型检验转换方法
+ *
+ * @param value json value
+ * @return List value
+ */
+ private static Map checkObject(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return (Map) value;
+ }
+
+ throw new AnalysisException("can not cast to map, value : " + value);
+ }
+
+ /**
+ * array 类型检验转换方法
+ *
+ * @param value json value
+ * @return List value
+ */
+ private static List checkArray(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof List) {
+ return (List) value;
+ }
+
+ throw new AnalysisException("can not cast to List, value : " + value);
+ }
+
+ private static Long checkLong(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToLong(value);
+ }
+
+ /**
+ * long 类型检验转换方法,若为空返回基础值
+ *
+ * @param value json value
+ * @return Long value
+ */
+ public static long checkLongValue(Object value) {
+
+ Long longVal = TypeUtils.castToLong(value);
+
+ if (longVal == null) {
+ return 0L;
+ }
+
+ return longVal;
+ }
+
+ /**
+ * Double 类型校验转换方法
+ *
+ * @param value json value
+ * @return Double value
+ */
+ private static Double checkDouble(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToDouble(value);
+ }
+
+
+ private static Integer checkInt(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToInt(value);
+ }
+
+
+ /**
+ * int 类型检验转换方法,若为空返回基础值
+ *
+ * @param value json value
+ * @return int value
+ */
+ private static int getIntValue(Object value) {
+
+ Integer intVal = TypeUtils.castToInt(value);
+
+ if (intVal == null) {
+ return 0;
+ }
+ return intVal;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
new file mode 100644
index 0000000..01e8540
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
@@ -0,0 +1,180 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.StreamAggregateConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.exception.AnalysisException;
+
+
+/**
+ * @author qidaijie
+ * @Package PACKAGE_NAME
+ * @Description:
+ * @date 2021/7/1218:20
+ */
+public class TypeUtils {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * Integer 类型判断方法
+ *
+ * @param value json value
+ * @return Integer value or null
+ */
+ public static Object castToIfFunction(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value.toString();
+ }
+
+ if (value instanceof Integer) {
+ return ((Number) value).intValue();
+ }
+
+ if (value instanceof Long) {
+ return ((Number) value).longValue();
+ }
+
+// if (value instanceof Map) {
+// return (Map) value;
+// }
+//
+// if (value instanceof List) {
+// return Collections.singletonList(value.toString());
+// }
+
+ if (value instanceof Boolean) {
+ return (Boolean) value ? 1 : 0;
+ }
+
+ throw new AnalysisException("can not cast to int, value : " + value);
+ }
+
+ /**
+ * Integer 类型判断方法
+ *
+ * @param value json value
+ * @return Integer value or null
+ */
+ static Integer castToInt(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Integer) {
+ return (Integer) value;
+ }
+
+ //此判断数值超范围不抛出异常,会截取成对应类型数值
+// if (value instanceof Number) {
+// return ((Number) value).intValue();
+// }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //将 10,20 类数据转换为10
+ if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Integer.parseInt(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Integer Error,The error Str is:" + strVal);
+ }
+ }
+
+ if (value instanceof Boolean) {
+ return (Boolean) value ? 1 : 0;
+ }
+
+ throw new AnalysisException("can not cast to int, value : " + value);
+ }
+
+ /**
+ * Double类型判断方法
+ *
+ * @param value json value
+ * @return double value or null
+ */
+ static Double castToDouble(Object value) {
+
+ if (value instanceof Double) {
+ return (Double) value;
+ }
+
+ //此判断数值超范围不抛出异常,会截取成对应类型数值
+// if (value instanceof Number) {
+// return ((Number) value).doubleValue();
+// }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //将 10,20 类数据转换为10
+ if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Double.parseDouble(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Double Error,The error Str is:" + strVal);
+ }
+ }
+
+ throw new AnalysisException("can not cast to double, value : " + value);
+ }
+
+ /**
+ * Long类型判断方法
+ *
+ * @param value json value
+ * @return (Long)value or null
+ */
+ static Long castToLong(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+// 此判断数值超范围不抛出异常,会截取成对应类型数值
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //将 10,20 类数据转换为10
+ if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Long.parseLong(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Long Error,The error Str is:" + strVal);
+ }
+ }
+
+ throw new AnalysisException("can not cast to long, value : " + value);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
new file mode 100644
index 0000000..2608187
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.StreamAggregateConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/9/610:37
+ */
+class CertUtils {
+ static void chooseCert(String type, Properties properties) {
+ switch (type) {
+ case "SSL":
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", StreamAggregateConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", StreamAggregateConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", StreamAggregateConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", StreamAggregateConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", StreamAggregateConfig.KAFKA_PIN);
+ break;
+ case "SASL":
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + StreamAggregateConfig.KAFKA_USER + " password=" + StreamAggregateConfig.KAFKA_PIN + ";");
+ break;
+ default:
+ }
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
new file mode 100644
index 0000000..a24ab4e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
@@ -0,0 +1,42 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.StreamAggregateConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/813:54
+ */
+public class Consumer {
+ private static Properties createConsumerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", StreamAggregateConfig.INPUT_KAFKA_SERVERS);
+ properties.put("group.id", StreamAggregateConfig.GROUP_ID);
+ properties.put("session.timeout.ms", "60000");
+ properties.put("max.poll.records", 3000);
+ properties.put("max.partition.fetch.bytes", 31457280);
+ properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+ CertUtils.chooseCert(StreamAggregateConfig.KAFKA_SOURCE_PROTOCOL,properties);
+
+ return properties;
+ }
+
+ public static FlinkKafkaConsumer<String> getKafkaConsumer() {
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(StreamAggregateConfig.INPUT_KAFKA_TOPIC,
+ new SimpleStringSchema(), createConsumerConfig());
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
new file mode 100644
index 0000000..65330b5
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
@@ -0,0 +1,53 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.StreamAggregateConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/814:04
+ */
+public class Producer {
+
+ private static Properties createProducerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", StreamAggregateConfig.OUTPUT_KAFKA_SERVERS);
+ properties.put("acks", StreamAggregateConfig.PRODUCER_ACK);
+ properties.put("retries", StreamAggregateConfig.RETRIES);
+ properties.put("linger.ms", StreamAggregateConfig.LINGER_MS);
+ properties.put("request.timeout.ms", StreamAggregateConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", StreamAggregateConfig.BATCH_SIZE);
+ properties.put("buffer.memory", StreamAggregateConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", StreamAggregateConfig.MAX_REQUEST_SIZE);
+ properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, StreamAggregateConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
+ CertUtils.chooseCert(StreamAggregateConfig.KAFKA_SINK_PROTOCOL, properties);
+
+ return properties;
+ }
+
+
+ public static FlinkKafkaProducer<String> getKafkaProducer() {
+ FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
+ StreamAggregateConfig.OUTPUT_KAFKA_TOPIC,
+ new SimpleStringSchema(),
+ createProducerConfig(), Optional.empty());
+
+ //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们
+ kafkaProducer.setLogFailuresOnly(false);
+
+ //写入kafka的消息携带时间戳
+// kafkaProducer.setWriteTimestampToKafka(true);
+
+
+ return kafkaProducer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/system/StreamAggregateConfigurations.java b/src/main/java/com/zdjizhi/utils/system/StreamAggregateConfigurations.java
new file mode 100644
index 0000000..3ea18a5
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/system/StreamAggregateConfigurations.java
@@ -0,0 +1,70 @@
+package com.zdjizhi.utils.system;
+
+import com.zdjizhi.utils.StringUtil;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class StreamAggregateConfigurations {
+
+ private static Properties propKafka = new Properties();
+ private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propService.getProperty(key);
+ } else if (type == 1) {
+ return propKafka.getProperty(key);
+ } else {
+ return null;
+ }
+
+ }
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propService.getProperty(key));
+ } else if (type == 1) {
+ return Integer.parseInt(propKafka.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propService.getProperty(key));
+ } else if (type == 1) {
+ return Long.parseLong(propKafka.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else if (type == 1) {
+ return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propService.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ propKafka.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
+ } catch (IOException | RuntimeException e) {
+ propKafka = null;
+ propService = null;
+ }
+ }
+}
diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties
new file mode 100644
index 0000000..9d91936
--- /dev/null
+++ b/src/main/java/log4j.properties
@@ -0,0 +1,25 @@
+#Log4j
+log4j.rootLogger=info,console,file
+# 控制台日志设置
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# 文件日志设置
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#路径请用相对路径,做好相关测试输出到应用目下
+log4j.appender.file.file=${nis.root}/log/galaxy-name.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
+log4j.logger.com.nis.web.dao=debug
+#bonecp数据源配置
+log4j.category.com.jolbox=debug,console
+
+
diff --git a/src/test/java/com/zdjizhi/FunctionsTest.java b/src/test/java/com/zdjizhi/FunctionsTest.java
new file mode 100644
index 0000000..6e3a20b
--- /dev/null
+++ b/src/test/java/com/zdjizhi/FunctionsTest.java
@@ -0,0 +1,33 @@
+package com.zdjizhi;
+
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2021/9/1714:22
+ */
+public class FunctionsTest {
+ private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap();
+
+ @Test
+ public void actionTest() {
+ HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap();
+ String[] metricNames = actionMap.getOrDefault("", actionMap.get("Default"));
+ System.out.println(actionMap.toString());
+ System.out.println(Arrays.toString(metricNames));
+
+
+
+ }
+
+
+
+}