diff options
| author | qidaijie <[email protected]> | 2021-09-27 11:13:25 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2021-09-27 11:13:25 +0800 |
| commit | 99bf45cdbc2186d37b9a3876bb3b4ab2b2620c3f (patch) | |
| tree | 14342d6cd2ce86b73b51b8dcfe88e0c1571d9b86 | |
| parent | dd31a0bf9612f93f8d1350f50b9f48456fb94c9a (diff) | |
提交2109版livecharts
24 files changed, 1927 insertions, 0 deletions
@@ -0,0 +1,240 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.zdjizhi</groupId> + <artifactId>log-olap-analysis-schema</artifactId> + <version>210908-security</version> + + <name>log-olap-analysis-schema</name> + <url>http://www.example.com</url> + + + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.40.125:8099/content/groups/public</url> + </repository> + + <repository> + <id>maven-ali</id> + <url>http://maven.aliyun.com/nexus/content/groups/public/</url> + <releases> + <!--<enabled>true</enabled>--> + </releases> + <snapshots> + <!--<enabled>true</enabled>--> + <checksumPolicy>fail</checksumPolicy> + </snapshots> + </repository> + </repositories> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flink.version>1.13.1</flink.version> + <hadoop.version>2.7.1</hadoop.version> + <kafka.version>1.0.0</kafka.version> + <hbase.version>2.2.3</hbase.version> + <scope.type>provided</scope.type> + <!--<scope.type>compile</scope.type>--> + </properties> + + <build> + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.2</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>com.zdjizhi.topology.StreamAggregateTopology</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>io.github.zlika</groupId> + <artifactId>reproducible-build-maven-plugin</artifactId> + <version>0.2</version> + <executions> + <execution> + <goals> + <goal>strip-jar</goal> + </goals> + <phase>package</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + <include>**/*.xml</include> + </includes> + <filtering>false</filtering> + </resource> + + <resource> + <directory>src\main\java</directory> + <includes> + <include>log4j.properties</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> + + <dependencies> + + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> + <version>1.0.6</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table --> + <!--<dependency>--> + <!--<groupId>org.apache.flink</groupId>--> + <!--<artifactId>flink-table</artifactId>--> + <!--<version>${flink.version}</version>--> + <!--<type>pom</type>--> + <!--<scope>${scope.type}</scope>--> + <!--</dependency>--> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.12</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.12</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_2.12</artifactId> + <version>${flink.version}</version> + <!--<scope>${scope.type}</scope>--> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + <dependency> + <groupId>cglib</groupId> + <artifactId>cglib-nodep</artifactId> + <version>3.2.4</version> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>5.3.2</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.2</version> + </dependency> + + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>io.prometheus</groupId> + <artifactId>simpleclient_pushgateway</artifactId> + <version>0.9.0</version> + </dependency> + + <dependency> + <groupId>cn.hutool</groupId> + <artifactId>hutool-all</artifactId> + <version>5.5.2</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.21</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.21</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + + </dependencies> +</project> + diff --git a/properties/default_config.properties b/properties/default_config.properties new file mode 100644 index 0000000..2b9bfb1 --- /dev/null +++ b/properties/default_config.properties @@ -0,0 +1,29 @@ +#producer���ԵĴ������� +retries=0 + +#���ĺ������˵һ��Batch������֮��������ã��������Batch��û��д���������뷢�ͳ�ȥ�� +linger.ms=5 + +#����ڳ�ʱ֮ǰδ�յ���Ӧ���ͻ��˽��ڱ�Ҫʱ���·������� +request.timeout.ms=30000 + +#producer���ǰ���batch���з��͵�,���δ�С��Ĭ��:16384 +batch.size=262144 + +#Producer�����ڻ�����Ϣ�Ļ�������С +buffer.memory=67108864 + +#�������������ÿ�η���Kafka���������������С,Ĭ��1048576 +max.request.size=5242880 + +#kafka SASL��֤�û��� +kafka.user=admin + +#kafka SASL��SSL��֤���� +kafka.pin=galaxy2019 + +#kafka source protocol; SSL or SASL +kafka.source.protocol=SASL + +#kafka sink protocol; SSL or SASL +kafka.sink.protocol=SASL
\ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties new file mode 100644 index 0000000..91f018d --- /dev/null +++ b/properties/service_flow_config.properties @@ -0,0 +1,51 @@ +#--------------------------------地址配置------------------------------# + +#管理kafka地址 +input.kafka.servers=192.168.44.12:9094 + +#管理输出kafka地址 +output.kafka.servers=192.168.44.12:9094 + +#--------------------------------HTTP------------------------------# +#kafka 证书地址 +tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ + +#网关的schema位置 +#schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart_interim +schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart_session + +#网关APP_ID 获取接口 +app.id.http=http://192.168.44.67:9999/open-api/appDicList + +#--------------------------------Kafka消费组信息------------------------------# + +#kafka 接收数据topic +input.kafka.topic=test +#input.kafka.topic=SESSION-RECORD +#input.kafka.topic=INTERIM-SESSION-RECORD + +#补全数据 输出 topic +output.kafka.topic=test-result + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=liveCharts-session-test-20210811-1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=none + +#生产者ack +producer.ack=1 + +#--------------------------------topology配置------------------------------# + +#consumer 并行度 +consumer.parallelism=1 + +#map函数并行度 +parse.parallelism=1 + +#app_id 更新时间,如填写0则不更新缓存 +app.tick.tuple.freq.secs=0 + +#聚合窗口时间 +count.window.time=15
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java new file mode 100644 index 0000000..3647c5a --- /dev/null +++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java @@ -0,0 +1,56 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.StreamAggregateConfigurations; + +/** + * @author Administrator + */ +public class StreamAggregateConfig { + + public static final String FORMAT_SPLITTER = ","; + public static final String PROTOCOL_SPLITTER = "\\."; + + /** + * System + */ + public static final Integer CONSUMER_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "consumer.parallelism"); + public static final Integer PARSE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.parallelism"); + public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs"); + public static final Integer COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "count.window.time"); + public static final String TOOLS_LIBRARY = StreamAggregateConfigurations.getStringProperty(0, "tools.library"); + + /** + * kafka source + */ + public static final String INPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.servers"); + public static final String OUTPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.servers"); + public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id"); + public static final String OUTPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.topic"); + public static final String INPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.topic"); + public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack"); + public static final String KAFKA_SOURCE_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.source.protocol"); + public static final String KAFKA_SINK_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.sink.protocol"); + public static final String KAFKA_USER = StreamAggregateConfigurations.getStringProperty(1, "kafka.user"); + public static final String KAFKA_PIN = StreamAggregateConfigurations.getStringProperty(1, "kafka.pin"); + public static final String RETRIES = StreamAggregateConfigurations.getStringProperty(1, "retries"); + public static final String LINGER_MS = StreamAggregateConfigurations.getStringProperty(1, "linger.ms"); + public static final Integer REQUEST_TIMEOUT_MS = StreamAggregateConfigurations.getIntProperty(1, "request.timeout.ms"); + public static final Integer BATCH_SIZE = StreamAggregateConfigurations.getIntProperty(1, "batch.size"); + public static final Integer BUFFER_MEMORY = StreamAggregateConfigurations.getIntProperty(1, "buffer.memory"); + public static final Integer MAX_REQUEST_SIZE = StreamAggregateConfigurations.getIntProperty(1, "max.request.size"); + + + /** + * kafka限流配置-20201117 + */ + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + + /** + * http + */ + public static final String SCHEMA_HTTP = StreamAggregateConfigurations.getStringProperty(0, "schema.http"); + public static final String APP_ID_HTTP = StreamAggregateConfigurations.getStringProperty(0, "app.id.http"); + + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java new file mode 100644 index 0000000..301d862 --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -0,0 +1,56 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.functions.*; +import com.zdjizhi.utils.kafka.Consumer; +import com.zdjizhi.utils.kafka.Producer; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.WindowedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + + +/** + * @author qidaijie + * @Package com.zdjizhi.topology + * @Description: + * @date 2021/5/2016:42 + */ +public class StreamAggregateTopology { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + try { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + environment.enableCheckpointing(5000); + + DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()) + .setParallelism(StreamAggregateConfig.CONSUMER_PARALLELISM); + + SingleOutputStreamOperator<Tuple3<String, String, String>> parseDataMap = streamSource.map(new MapParseFunction()).name("ParseDataMap") + .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); + + WindowedStream<Tuple3<String, String, String>, String, TimeWindow> window = parseDataMap.keyBy(new KeyByFunction()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.COUNT_WINDOW_TIME))); + + SingleOutputStreamOperator<String> metricCountWindow = window.process(new CountWindowFunction()).name("MetricCountWindow") + .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); + + metricCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM) + .addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); + + environment.execute(args[0]); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :" + e); + } + + } + +} diff --git a/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java b/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java new file mode 100644 index 0000000..ad251a2 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/exception/AnalysisException.java @@ -0,0 +1,18 @@ +package com.zdjizhi.utils.exception; + +/** + * @author qidaijie + * @Package com.zdjizhi.storm.utils.execption + * @Description: + * @date 2021/3/259:42 + */ +public class AnalysisException extends RuntimeException { + + public AnalysisException() { + } + + public AnalysisException(String message) { + super(message); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java new file mode 100644 index 0000000..5f22a6b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java @@ -0,0 +1,105 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.MetricFunctions; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2113:55 + */ +public class CountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, String, String, TimeWindow> { + private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class); + + private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap(); + private static HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap(); + private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(32); + private static String resultTimeKey = JsonParseUtil.getTimeKey(); + + @Override + @SuppressWarnings("unchecked") + public void process(String key, Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) { + try { + for (Tuple3<String, String, String> tuple : input) { + String label = tuple.f0; + //action中某个协议的所有function,如果没有就默认 + String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default")); + String dimensions = tuple.f1; + String message = tuple.f2; + if (StringUtil.isNotBlank(message)){ + Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class); + Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); + + Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj); + for (String name : metricNames) { + String[] metrics = metricsMap.get(name); + String function = metrics[0]; + String fieldName = metrics[1]; + functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, fieldName)); + + } + cacheMap.put(dimensions, cacheMessage); + } + } + + if (!cacheMap.isEmpty()) { + Long endTime = context.window().getEnd() / 1000; + + for (String countKey : cacheMap.keySet()) { + Map<String, Object> resultMap = cacheMap.get(countKey); + JsonParseUtil.setValue(resultMap, resultTimeKey, endTime); + output.collect(JsonMapper.toJsonString(resultMap)); + } +// cacheMap.clear(); + } + + } catch (RuntimeException e) { + logger.error("windows count error,message:" + e); + e.printStackTrace(); + } finally { + cacheMap.clear(); + } + } + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param function 函数名称 + * @param cacheMessage 结果集 + * @param nameValue 当前值 + * @param fieldNameValue 新加值 + */ + private static void functionSet(String function, Map<String, Object> cacheMessage, String resultName, Object nameValue, Object fieldNameValue) { + switch (function) { + case "sum": + cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue)); + break; + case "count": + cacheMessage.put(resultName, MetricFunctions.count(nameValue)); + break; + case "unique_sip_num": + //TODO + break; + case "unique_cip_num": + //TODO + break; + default: + break; + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java new file mode 100644 index 0000000..de507ad --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java @@ -0,0 +1,17 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class FilterNullFunction implements FilterFunction<String> { + @Override + public boolean filter(String message) { + return StringUtil.isNotBlank(message); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java new file mode 100644 index 0000000..0b00b3c --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java @@ -0,0 +1,19 @@ +package com.zdjizhi.utils.functions; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple3; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2112:13 + */ +public class KeyByFunction implements KeySelector<Tuple3<String, String, String>, String> { + + @Override + public String getKey(Tuple3<String, String, String> value) throws Exception { + //以map拼接的key分组 + return value.f1; + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java new file mode 100644 index 0000000..41a6109 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java @@ -0,0 +1,122 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.ParseFunctions; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class MapParseFunction implements MapFunction<String, Tuple3<String, String, String>> { + private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class); + + private static ArrayList<String[]> jobList = JsonParseUtil.getTransformsList(); + + private static HashMap<String, String> dimensionsMap = JsonParseUtil.getDimensionsMap(); + + @Override + @SuppressWarnings("unchecked") + public Tuple3<String, String, String> map(String message) { + try { + if (StringUtil.isNotBlank(message)) { + Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); + Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object); + if (ParseFunctions.filterLogs(object)) { + for (String[] strings : jobList) { + //函数名称 + String function = strings[0]; + //需要补全的字段的key + String resultKeyName = strings[1]; + //原始日志字段key + String logsKeyName = strings[2]; + //原始日志字段对应的值 + Object logsKeyValue = JsonParseUtil.getValue(object, strings[2]); + //额外的参数的值 + String parameters = strings[3]; + + switch (function) { + case "dismantling": + if (StringUtil.isNotBlank(parameters)) { + if (logsKeyValue != null) { + JsonParseUtil.setValue(message, logsKeyName, dismantlingUtils(parameters, logsKeyValue)); + } + } + break; + case "combination": + if (StringUtil.isNotBlank(parameters)) { + if (logsKeyValue != null) { + combinationUtils(dimensionsObj, object, parameters, resultKeyName, logsKeyName); + } + } + break; + case "hierarchy": +// collector.emit(new Values(JsonParseUtil.getValue(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object))); + return new Tuple3<>(JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)); + default: + break; + } + } + } + } + } catch (RuntimeException e) { + logger.error("Map Parse error,message:" + e); + return new Tuple3<>("", "", ""); + } + return new Tuple3<>("", "", ""); + } + + /** + * alignment ID替换操作 + * 根据缓存中的AppId对应信息,获取当前AppId对应的具体名称。 + * + * @param parameters 参数集 + * @param fieldName 原始日志列名 + */ + private static String dismantlingUtils(String parameters, Object fieldName) { + String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); + int digits = Integer.parseInt(alignmentPars[0]); + return fieldName.toString().split(StreamAggregateConfig.PROTOCOL_SPLITTER)[digits]; + } + + /** + * combination 拼接操作 + * 获取方法函数中 parameters 字段,结构 "parameters": "abc,/" ;abc为要拼接字段 /为拼接的分隔符 + * + * @param parameters 参数集 + * @param message 原始日志 + * @param fieldName 原始日志列名 + */ + private static void combinationUtils(Map<String, Object> dimensions, Map<String, Object> message, String parameters, String resultKeyName, String fieldName) { + String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER); + Object combinationField = JsonParseUtil.getValue(message, combinationPars[0]); + if (combinationField != null) { + String separator = combinationPars[1]; + Object fieldNameValue = JsonParseUtil.getValue(message, fieldName); + if (fieldNameValue != null) { + String combinationValue = fieldNameValue + separator + combinationField; + dimensions.put(resultKeyName, combinationValue); + JsonParseUtil.setValue(message, fieldName, combinationValue); + } else { + dimensions.put(resultKeyName, combinationField); + JsonParseUtil.setValue(message, fieldName, combinationField); + + } + } + } + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java b/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java new file mode 100644 index 0000000..4ba139f --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java @@ -0,0 +1,22 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2117:32 + */ +public class MyTimeAssigner implements SerializableTimestampAssigner<String> { + @Override + public long extractTimestamp(String element, long recordTimestamp) { + Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(element, Map.class); + + return JsonParseUtil.getLong(object,"common_end_time"); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java new file mode 100644 index 0000000..d458984 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java @@ -0,0 +1,43 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; + +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2114:52 + */ +public class ResultFlatMapFunction implements FlatMapFunction<String, String> { + private static String[] jobList = JsonParseUtil.getHierarchy(); + + @Override + @SuppressWarnings("unchecked") + public void flatMap(String value, Collector out) throws Exception { + StringBuffer stringBuffer = new StringBuffer(); + String name = jobList[0]; + Map<String, Object> jsonObject = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class); + String protocol = JsonParseUtil.getString(jsonObject, name); + if (StringUtil.isNotBlank(protocol)) { + String[] protocolIds = protocol.split(StreamAggregateConfig.PROTOCOL_SPLITTER); + for (String proto : protocolIds) { + if (StringUtil.isBlank(stringBuffer.toString())) { + stringBuffer.append(proto); + jsonObject.put(name, stringBuffer.toString()); + out.collect(JsonMapper.toJsonString(jsonObject)); + } else { + stringBuffer.append(jobList[1]).append(proto); + jsonObject.put(name, stringBuffer.toString()); + out.collect(JsonMapper.toJsonString(jsonObject)); + } + } + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java new file mode 100644 index 0000000..5417236 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/MetricFunctions.java @@ -0,0 +1,37 @@ +package com.zdjizhi.utils.general; + + +import com.zdjizhi.utils.json.JsonTypeUtils; + +/** + * @author qidaijie + * @Package com.zdjizhi.storm.utils.general + * @Description: + * @date 2021/7/2015:31 + */ +public class MetricFunctions { + /** + * Long类型的数据求和 + * + * @param value1 第一个值 + * @param value2 第二个值 + * @return value1 + value2 + */ + public static Long longSum(Object value1, Object value2) { + Long res1 = JsonTypeUtils.checkLongValue(value1); + Long res2 = JsonTypeUtils.checkLongValue(value2); + + return res1 + res2; + } + + /** + * 计算Count + * + * @param count 当前count值 + * @return count+1 + */ + public static Long count(Object count) { + + return JsonTypeUtils.checkLongValue(count) + 1L; + } +} diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java new file mode 100644 index 0000000..5ab46e6 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java @@ -0,0 +1,97 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; + +import java.util.HashMap; +import java.util.Map; + +/** + * @ClassNameAggregateUtils + * @Author [email protected] + * @Date2020/6/23 14:04 + * @Version V1.0 + **/ +public class ParseFunctions { + /** + * 获取filters条件map + */ + private static HashMap<String, String> filtersMap = JsonParseUtil.getFiltersMap(); + + + /** + * 解析 dimensions 字段集 + * + * @param dimensions 维度集 + * @param message 原始日志 + * @return 结果维度集 + */ + public static Map<String, Object> transDimensions(Map<String, String> dimensions, Map<String, Object> message) { + HashMap<String, Object> dimensionsObj = new HashMap<>(16); + + for (String dimension : dimensions.keySet()) { + dimensionsObj.put(dimension, JsonParseUtil.getValue(message, dimensions.get(dimension))); + } + + return dimensionsObj; + } + + /** + * 构建filters过滤函数,根据Schema指定的函数对日志进行过滤 + * + * @param object 原始日志 + * @return true or false + */ + public static boolean filterLogs(Map<String, Object> object) { + boolean available = false; + + for (String key : filtersMap.keySet()) { + switch (key) { + case "notempty": + Object value = JsonParseUtil.getValue(object, filtersMap.get(key)); + if (value != null && StringUtil.isNotBlank(value.toString())) { + available = true; + } + break; + default: + } + } + return available; + } + +// /** +// * 更新缓存中的对应关系map +// * +// * @param hashMap 当前缓存对应关系map +// */ +// public static void updateAppRelation(HashMap<Integer, String> hashMap) { +// try { +// Long begin = System.currentTimeMillis(); +// String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP); +// if (StringUtil.isNotBlank(schema)) { +// String data = JSONObject.parseObject(schema).getString("data"); +// JSONArray objects = JSONArray.parseArray(data); +// for (Object object : objects) { +// JSONArray jsonArray = JSONArray.parseArray(object.toString()); +// int key = jsonArray.getInteger(0); +// String value = jsonArray.getString(1); +// if (hashMap.containsKey(key)) { +// if (!value.equals(hashMap.get(key))) { +// hashMap.put(key, value); +// } +// } else { +// hashMap.put(key, value); +// } +// } +// logger.warn("更新缓存对应关系用时:" + (begin - System.currentTimeMillis())); +// logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size()); +// } +// } catch (RuntimeException e) { +// logger.error("更新缓存APP-ID失败,异常:" + e); +// } +// } + +} diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java new file mode 100644 index 0000000..1adb1d1 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java @@ -0,0 +1,77 @@ +package com.zdjizhi.utils.http; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * 获取网关schema的工具类 + * + * @author qidaijie + */ +public class HttpClientUtil { + private static final Log logger = LogFactory.get(); + + /** + * 请求网关获取schema + * + * @param http 网关url + * @return schema + */ + public static String requestByGetMethod(String http) { + CloseableHttpClient httpClient = HttpClients.createDefault(); + StringBuilder entityStringBuilder; + + HttpGet get = new HttpGet(http); + BufferedReader bufferedReader = null; + CloseableHttpResponse httpResponse = null; + try { + httpResponse = httpClient.execute(get); + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + int intC; + while ((intC = bufferedReader.read()) != -1) { + char c = (char) intC; + if (c == '\n') { + break; + } + entityStringBuilder.append(c); + } + + return entityStringBuilder.toString(); + } + } catch (IOException e) { + logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); + } finally { + if (httpClient != null) { + try { + httpClient.close(); + } catch (IOException e) { + logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); + } + } + if (httpResponse != null) { + try { + httpResponse.close(); + } catch (IOException e) { + logger.error("Close httpResponse ERROR! Exception messgae is:" + e); + } + } + if (bufferedReader != null) { + IOUtils.closeQuietly(bufferedReader); + } + } + return ""; + } +} diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java new file mode 100644 index 0000000..4a6a01c --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -0,0 +1,357 @@ +package com.zdjizhi.utils.json; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.http.HttpClientUtil; +import net.sf.cglib.beans.BeanGenerator; +import net.sf.cglib.beans.BeanMap; + +import java.util.*; + +/** + * 使用FastJson解析json的工具类 + * + * @author qidaijie + */ +public class JsonParseUtil { + + private static final Log logger = LogFactory.get(); + + /** + * 模式匹配,给定一个类型字符串返回一个类类型 + * + * @param type 类型 + * @return 类类型 + */ + + public static Class getClassName(String type) { + Class clazz; + + switch (type) { + case "int": + clazz = Integer.class; + break; + case "string": + clazz = String.class; + break; + case "long": + clazz = long.class; + break; + case "array": + clazz = List.class; + break; + case "double": + clazz = double.class; + break; + case "float": + clazz = float.class; + break; + case "char": + clazz = char.class; + break; + case "byte": + clazz = byte.class; + break; + case "boolean": + clazz = boolean.class; + break; + case "short": + clazz = short.class; + break; + default: + clazz = String.class; + } + return clazz; + } + + + /** + * 获取属性值的方法 + * + * @param obj 对象 + * @param property key + * @return 属性的值 + */ + public static Object getValue(Object obj, String property) { + + try { + BeanMap beanMap = BeanMap.create(obj); + if (beanMap.containsKey(property)) { + return beanMap.get(property); + } else { + return null; + } + } catch (RuntimeException e) { + logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + return null; + } + } + + /** + * 获取属性值的方法 + * + * @param jsonMap 原始日志 + * @param property key + * @return 属性的值 + */ + public static Object getValue(Map<String, Object> jsonMap, String property) { + try { + return jsonMap.getOrDefault(property, null); + } catch (RuntimeException e) { + logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + return null; + } + } + + /** + * long 类型检验转换方法,若为空返回基础值 + * + * @return Long value + */ + public static Long getLong(Map<String, Object> jsonMap, String property) { + Object value = jsonMap.getOrDefault(property, null); + Long longVal = TypeUtils.castToLong(value); + + if (longVal == null) { + return 0L; + } + + return longVal; + } + + public static String getString(Map<String, Object> jsonMap, String property) { + Object value = jsonMap.getOrDefault(property, null); + if (value == null) { + return null; + } + + if (value instanceof Map) { + return JsonMapper.toJsonString(value); + } + + if (value instanceof List) { + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * 更新属性值的方法 + * + * @param jsonMap 原始日志json map + * @param property 更新的key + * @param value 更新的值 + */ + public static void setValue(Map<String, Object> jsonMap, String property, Object value) { + try { + jsonMap.put(property, value); + } catch (RuntimeException e) { + logger.error("赋予实体类错误类型数据", e); + } + } + + /** + * 更新属性值的方法 + * + * @param obj 对象 + * @param property 更新的key + * @param value 更新的值 + */ + public static void setValue(Object obj, String property, Object value) { + try { + BeanMap beanMap = BeanMap.create(obj); + beanMap.put(property, value); + } catch (ClassCastException e) { + logger.error("赋予实体类错误类型数据", e); + } + } + + /** + * 根据反射生成对象的方法 + * + * @param properties 反射类用的map + * @return 生成的Object类型的对象 + */ + public static Object generateObject(Map properties) { + BeanGenerator generator = new BeanGenerator(); + Set keySet = properties.keySet(); + for (Object aKeySet : keySet) { + String key = (String) aKeySet; + generator.addProperty(key, (Class) properties.get(key)); + } + return generator.create(); + } + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + * + * @return 用于反射生成schema类型的对象的一个map集合 + */ + public static HashMap<String, String[]> getActionMap() { + HashMap<String, String[]> map = new HashMap<>(16); + + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List<Object> actions = parse.read("$.data.doc.action[*]"); + + for (Object action : actions) { + map.put(JsonPath.read(action, "$.label"), + JsonPath.read(action, "$.metrics").toString().split(StreamAggregateConfig.FORMAT_SPLITTER)); +// System.out.println(JsonPath.read(action, "$.label")+JsonPath.read(action, "$.metrics").toString()); + } + + return map; + } + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + * + * @return 用于反射生成schema类型的对象的一个map集合 + */ + public static HashMap<String, String[]> getMetricsMap() { + HashMap<String, String[]> map = new HashMap<>(16); + + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List<Object> metrics = parse.read("$.data.doc.metrics[*]"); + + for (Object metric : metrics) { + map.put(JsonPath.read(metric, "$.name"), + new String[]{JsonPath.read(metric, "$.function"), JsonPath.read(metric, "$.fieldName")} + ); + } + return map; + } + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + * + * @return 用于反射生成schema类型的对象的一个map集合 + */ + public static String getTimeKey() { + + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + + return JsonPath.read(schema, "$.data.doc.timestamp.name"); + } + + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + * + * @return 用于反射生成schema类型的对象的一个map集合 + */ + public static HashMap<String, Class> getResultLogMap() { + HashMap<String, Class> map = new HashMap<>(16); + + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List<Object> dimensions = parse.read("$.data.doc.dimensions[*]"); + + for (Object dimension : dimensions) { + map.put(JsonPath.read(dimension, "$.name"), + JsonParseUtil.getClassName(JsonPath.read(dimension, "$.type"))); + } + + List<Object> metrics = parse.read("$.data.doc.metrics[*]"); + for (Object metric : metrics) { + map.put(JsonPath.read(metric, "$.name"), + JsonParseUtil.getClassName(JsonPath.read(metric, "$.type"))); + } + + return map; + } + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + * + * @return 用于反射生成schema类型的对象的一个map集合 + */ + public static HashMap<String, String> getDimensionsMap() { + HashMap<String, String> map = new HashMap<>(16); + + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List<Object> dimensions = parse.read("$.data.doc.dimensions[*]"); + + for (Object dimension : dimensions) { + map.put(JsonPath.read(dimension, "$.name"), + JsonPath.read(dimension, "$.fieldName")); + } + + return map; + } + + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + * + * @return 用于反射生成schema类型的对象的一个map集合 + */ + public static HashMap<String, String> getFiltersMap() { + HashMap<String, String> map = new HashMap<>(16); + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List<Object> filters = parse.read("$.data.doc.filters[*]"); + for (Object filter : filters) { + map.put(JsonPath.read(filter, "$.type"), JsonPath.read(filter, "$.fieldName")); + } + + return map; + } + + + /** + * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) + * + * @return 任务列表 + */ + public static ArrayList<String[]> getTransformsList() { + ArrayList<String[]> list = new ArrayList<>(); + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List<Object> transforms = parse.read("$.data.doc.transforms[*]"); + for (Object transform : transforms) { + String function = JsonPath.read(transform, "$.function").toString(); + String name = JsonPath.read(transform, "$.name").toString(); + String fieldName = JsonPath.read(transform, "$.fieldName").toString(); + String parameters = JsonPath.read(transform, "$.parameters").toString(); + list.add(new String[]{function, name, fieldName, parameters}); + } + + return list; + } + + /** + * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) + * + * @return 任务列表 + */ + public static String[] getHierarchy() { + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + List<Object> transforms = parse.read("$.data.doc.transforms[*]"); + for (Object transform : transforms) { + String function = JsonPath.read(transform, "$.function").toString(); + if ("hierarchy".equals(function)) { + String name = JsonPath.read(transform, "$.name").toString(); + String parameters = JsonPath.read(transform, "$.parameters").toString(); + return new String[]{name, parameters}; + } + } + return null; + } + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java new file mode 100644 index 0000000..034f76a --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java @@ -0,0 +1,142 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.exception.AnalysisException; + +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1217:34 + */ +public class JsonTypeUtils { + private static final Log logger = LogFactory.get(); + /** + * String 类型检验转换方法 + * + * @param value json value + * @return String value + */ + public static String checkString(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return JsonMapper.toJsonString(value); + } + + if (value instanceof List) { + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + private static Map checkObject(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return (Map) value; + } + + throw new AnalysisException("can not cast to map, value : " + value); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + private static List checkArray(Object value) { + if (value == null) { + return null; + } + + if (value instanceof List) { + return (List) value; + } + + throw new AnalysisException("can not cast to List, value : " + value); + } + + private static Long checkLong(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToLong(value); + } + + /** + * long 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return Long value + */ + public static long checkLongValue(Object value) { + + Long longVal = TypeUtils.castToLong(value); + + if (longVal == null) { + return 0L; + } + + return longVal; + } + + /** + * Double 类型校验转换方法 + * + * @param value json value + * @return Double value + */ + private static Double checkDouble(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToDouble(value); + } + + + private static Integer checkInt(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToInt(value); + } + + + /** + * int 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return int value + */ + private static int getIntValue(Object value) { + + Integer intVal = TypeUtils.castToInt(value); + + if (intVal == null) { + return 0; + } + return intVal; + } + +} diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java new file mode 100644 index 0000000..01e8540 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java @@ -0,0 +1,180 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.StreamAggregateConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.exception.AnalysisException; + + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1218:20 + */ +public class TypeUtils { + private static final Log logger = LogFactory.get(); + + /** + * Integer 类型判断方法 + * + * @param value json value + * @return Integer value or null + */ + public static Object castToIfFunction(Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return value.toString(); + } + + if (value instanceof Integer) { + return ((Number) value).intValue(); + } + + if (value instanceof Long) { + return ((Number) value).longValue(); + } + +// if (value instanceof Map) { +// return (Map) value; +// } +// +// if (value instanceof List) { +// return Collections.singletonList(value.toString()); +// } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new AnalysisException("can not cast to int, value : " + value); + } + + /** + * Integer 类型判断方法 + * + * @param value json value + * @return Integer value or null + */ + static Integer castToInt(Object value) { + + if (value == null) { + return null; + } + + if (value instanceof Integer) { + return (Integer) value; + } + + //此判断数值超范围不抛出异常,会截取成对应类型数值 +// if (value instanceof Number) { +// return ((Number) value).intValue(); +// } + + if (value instanceof String) { + String strVal = (String) value; + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Integer.parseInt(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Integer Error,The error Str is:" + strVal); + } + } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new AnalysisException("can not cast to int, value : " + value); + } + + /** + * Double类型判断方法 + * + * @param value json value + * @return double value or null + */ + static Double castToDouble(Object value) { + + if (value instanceof Double) { + return (Double) value; + } + + //此判断数值超范围不抛出异常,会截取成对应类型数值 +// if (value instanceof Number) { +// return ((Number) value).doubleValue(); +// } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Double.parseDouble(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Double Error,The error Str is:" + strVal); + } + } + + throw new AnalysisException("can not cast to double, value : " + value); + } + + /** + * Long类型判断方法 + * + * @param value json value + * @return (Long)value or null + */ + static Long castToLong(Object value) { + if (value == null) { + return null; + } + +// 此判断数值超范围不抛出异常,会截取成对应类型数值 + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(StreamAggregateConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(StreamAggregateConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Long.parseLong(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Long Error,The error Str is:" + strVal); + } + } + + throw new AnalysisException("can not cast to long, value : " + value); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java new file mode 100644 index 0000000..2608187 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -0,0 +1,36 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.StreamAggregateConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/9/610:37 + */ +class CertUtils { + static void chooseCert(String type, Properties properties) { + switch (type) { + case "SSL": + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", StreamAggregateConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", StreamAggregateConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", StreamAggregateConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", StreamAggregateConfig.KAFKA_PIN); + properties.put("ssl.key.password", StreamAggregateConfig.KAFKA_PIN); + break; + case "SASL": + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + StreamAggregateConfig.KAFKA_USER + " password=" + StreamAggregateConfig.KAFKA_PIN + ";"); + break; + default: + } + + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java new file mode 100644 index 0000000..a24ab4e --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -0,0 +1,42 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.StreamAggregateConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/813:54 + */ +public class Consumer { + private static Properties createConsumerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", StreamAggregateConfig.INPUT_KAFKA_SERVERS); + properties.put("group.id", StreamAggregateConfig.GROUP_ID); + properties.put("session.timeout.ms", "60000"); + properties.put("max.poll.records", 3000); + properties.put("max.partition.fetch.bytes", 31457280); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + CertUtils.chooseCert(StreamAggregateConfig.KAFKA_SOURCE_PROTOCOL,properties); + + return properties; + } + + public static FlinkKafkaConsumer<String> getKafkaConsumer() { + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(StreamAggregateConfig.INPUT_KAFKA_TOPIC, + new SimpleStringSchema(), createConsumerConfig()); + + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java new file mode 100644 index 0000000..65330b5 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java @@ -0,0 +1,53 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.StreamAggregateConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Optional; +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/814:04 + */ +public class Producer { + + private static Properties createProducerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", StreamAggregateConfig.OUTPUT_KAFKA_SERVERS); + properties.put("acks", StreamAggregateConfig.PRODUCER_ACK); + properties.put("retries", StreamAggregateConfig.RETRIES); + properties.put("linger.ms", StreamAggregateConfig.LINGER_MS); + properties.put("request.timeout.ms", StreamAggregateConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", StreamAggregateConfig.BATCH_SIZE); + properties.put("buffer.memory", StreamAggregateConfig.BUFFER_MEMORY); + properties.put("max.request.size", StreamAggregateConfig.MAX_REQUEST_SIZE); + properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, StreamAggregateConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + + CertUtils.chooseCert(StreamAggregateConfig.KAFKA_SINK_PROTOCOL, properties); + + return properties; + } + + + public static FlinkKafkaProducer<String> getKafkaProducer() { + FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( + StreamAggregateConfig.OUTPUT_KAFKA_TOPIC, + new SimpleStringSchema(), + createProducerConfig(), Optional.empty()); + + //启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们 + kafkaProducer.setLogFailuresOnly(false); + + //写入kafka的消息携带时间戳 +// kafkaProducer.setWriteTimestampToKafka(true); + + + return kafkaProducer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/system/StreamAggregateConfigurations.java b/src/main/java/com/zdjizhi/utils/system/StreamAggregateConfigurations.java new file mode 100644 index 0000000..3ea18a5 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/system/StreamAggregateConfigurations.java @@ -0,0 +1,70 @@ +package com.zdjizhi.utils.system; + +import com.zdjizhi.utils.StringUtil; + +import java.io.IOException; +import java.util.Locale; +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class StreamAggregateConfigurations { + + private static Properties propKafka = new Properties(); + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else if (type == 1) { + return propKafka.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else if (type == 1) { + return Integer.parseInt(propKafka.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else if (type == 1) { + return Long.parseLong(propKafka.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else if (type == 1) { + return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else { + return null; + } + } + + static { + try { + propService.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propKafka.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + } catch (IOException | RuntimeException e) { + propKafka = null; + propService = null; + } + } +} diff --git a/src/main/java/log4j.properties b/src/main/java/log4j.properties new file mode 100644 index 0000000..9d91936 --- /dev/null +++ b/src/main/java/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=info,console,file +# 控制台日志设置 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 文件日志设置 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#路径请用相对路径,做好相关测试输出到应用目下 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 +log4j.logger.com.nis.web.dao=debug +#bonecp数据源配置 +log4j.category.com.jolbox=debug,console + + diff --git a/src/test/java/com/zdjizhi/FunctionsTest.java b/src/test/java/com/zdjizhi/FunctionsTest.java new file mode 100644 index 0000000..6e3a20b --- /dev/null +++ b/src/test/java/com/zdjizhi/FunctionsTest.java @@ -0,0 +1,33 @@ +package com.zdjizhi; + +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/9/1714:22 + */ +public class FunctionsTest { + private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap(); + + @Test + public void actionTest() { + HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap(); + String[] metricNames = actionMap.getOrDefault("", actionMap.get("Default")); + System.out.println(actionMap.toString()); + System.out.println(Arrays.toString(metricNames)); + + + + } + + + +} |
