diff options
30 files changed, 3354 insertions, 0 deletions
@@ -0,0 +1,301 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.zdjizhi</groupId> + <artifactId>log-completion-schema</artifactId> + <version>20210728</version> + + <name>log-completion-schema</name> + <url>http://www.example.com</url> + + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.40.125:8099/content/groups/public</url> + </repository> + + <repository> + <id>maven-ali</id> + <url>http://maven.aliyun.com/nexus/content/groups/public/</url> + <releases> + <!--<enabled>true</enabled>--> + </releases> + <snapshots> + <!--<enabled>true</enabled>--> + <checksumPolicy>fail</checksumPolicy> + </snapshots> + </repository> + </repositories> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flink.version>1.13.1</flink.version> + <hadoop.version>2.7.1</hadoop.version> + <kafka.version>1.0.0</kafka.version> + <hbase.version>2.2.3</hbase.version> + <scope.type>provided</scope.type> + <!--<scope.type>compile</scope.type>--> + </properties> + + <build> + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.2</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>com.zdjizhi.topology.LogFlowWriteTopology</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>io.github.zlika</groupId> + <artifactId>reproducible-build-maven-plugin</artifactId> + <version>0.2</version> + <executions> + <execution> + <goals> + <goal>strip-jar</goal> + </goals> + <phase>package</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + <include>**/*.xml</include> + </includes> + <filtering>false</filtering> + </resource> + + <resource> + <directory>src\main\java</directory> + <includes> + <include>log4j.properties</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> + + <dependencies> + + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> + <version>1.0.6</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.70</version> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table</artifactId> + <version>${flink.version}</version> + <type>pom</type> + <scope>${scope.type}</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>${flink.version}</version> + <!--<scope>${scope.type}</scope>--> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.11</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_2.11</artifactId> + <version>${flink.version}</version> + <!--<scope>${scope.type}</scope>--> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.10</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>cglib</groupId> + <artifactId>cglib-nodep</artifactId> + <version>3.2.4</version> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>5.3.2</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.2</version> + </dependency> + + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>io.prometheus</groupId> + <artifactId>simpleclient_pushgateway</artifactId> + <version>0.9.0</version> + </dependency> + + <dependency> + <groupId>cn.hutool</groupId> + <artifactId>hutool-all</artifactId> + <version>5.5.2</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + + </dependencies> +</project> + diff --git a/properties/default_config.properties b/properties/default_config.properties new file mode 100644 index 0000000..d82130d --- /dev/null +++ b/properties/default_config.properties @@ -0,0 +1,29 @@ +#producer���ԵĴ������� +retries=0 + +#���ĺ������˵һ��Batch������֮��������ã��������Batch��û��д���������뷢�ͳ�ȥ�� +linger.ms=10 + +#����ڳ�ʱ֮ǰδ�յ���Ӧ���ͻ��˽��ڱ�Ҫʱ���·������� +request.timeout.ms=30000 + +#producer���ǰ���batch���з��͵�,���δ�С��Ĭ��:16384 +batch.size=262144 + +#Producer�����ڻ�����Ϣ�Ļ�������С +#64M +#buffer.memory=67108864 +#128M +buffer.memory=134217728 + +#�������������ÿ�η���Kafka���������������С,Ĭ��1048576 +#5M +#max.request.size=5242880 +#10M +max.request.size=10485760 + +#hbase table name +hbase.table.name=subscriber_info + +#�ʼ�Ĭ�ϱ��� +mail.default.charset=UTF-8
\ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties new file mode 100644 index 0000000..9bb2f84 --- /dev/null +++ b/properties/service_flow_config.properties @@ -0,0 +1,74 @@ +#--------------------------------地址配置------------------------------# + +#管理kafka地址 +input.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 + +#管理输出kafka地址 +output.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 + +#zookeeper 地址 用于配置log_id +zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 + +#hbase zookeeper地址 用于连接HBase +hbase.zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 + +#--------------------------------HTTP/定位库------------------------------# +#定位库地址 +ip.library=/home/bigdata/topology/dat/ + +#网关的schema位置 +schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log + +#网关APP_ID 获取接口 +app.id.http=http://192.168.44.67:9999/open-api/appDicList + +#--------------------------------Kafka消费组信息------------------------------# + +#kafka 接收数据topic +input.kafka.topic=CONNECTION-RECORD-LOG + +#补全数据 输出 topic +output.kafka.topic=CONNECTION-RECORD-COMPLETED-LOG + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=connection-record-flink-20210809 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=none + +#生产者ack +producer.ack=1 + +#接收自kafka的消费者 client-id +consumer.client.id=consumer-connection-record + +#回写给kafka的生产者 client-id +producer.client.id=producer-connection-record + +#--------------------------------topology配置------------------------------# + +#consumer 并行度 +consumer.parallelism=3 + +#map函数并行度 +map.parallelism=3 + +#producer 并行度 +producer.parallelism=3 + +#数据中心,取值范围(0-63) +data.center.id.num=0 + +#hbase 更新时间,如填写0则不更新缓存 +hbase.tick.tuple.freq.secs=180 + +#app_id 更新时间,如填写0则不更新缓存 +app.tick.tuple.freq.secs=0 + +#--------------------------------默认值配置------------------------------# + +#邮件默认编码 +mail.default.charset=UTF-8 + +#0不需要补全原样输出日志,1需要补全 +log.need.complete=1 diff --git a/src/main/java/com/zdjizhi/common/DefaultProConfig.java b/src/main/java/com/zdjizhi/common/DefaultProConfig.java new file mode 100644 index 0000000..b98ea53 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/DefaultProConfig.java @@ -0,0 +1,21 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.FlowWriteConfigurations; + +/** + * @author Administrator + */ +public class DefaultProConfig { + + + public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries"); + public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms"); + public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms"); + public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size"); + public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory"); + public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size"); + public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name"); + + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java new file mode 100644 index 0000000..bf82757 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -0,0 +1,59 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.FlowWriteConfigurations; + +/** + * @author Administrator + */ +public class FlowWriteConfig { + + public static final int IF_PARAM_LENGTH = 3; + public static final String VISIBILITY = "disabled"; + public static final String FORMAT_SPLITTER = ","; + public static final String IS_JSON_KEY_TAG = "$."; + public static final String IF_CONDITION_SPLITTER = "="; + public static final String MODEL = "remote"; + public static final String PROTOCOL_SPLITTER = "\\."; + /** + * System + */ + public static final Integer CONSUMER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "consumer.parallelism"); + public static final Integer MAP_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "map.parallelism"); + public static final Integer PRODUCER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "producer.parallelism"); + public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); + public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs"); + public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); + public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete"); + public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); + + + + /** + * kafka + */ + public static final String INPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "input.kafka.servers"); + public static final String OUTPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "output.kafka.servers"); + public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); + public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); + public static final String OUTPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "output.kafka.topic"); + public static final String INPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "input.kafka.topic"); + public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack"); + public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library"); + + /** + * kafka限流配置-20201117 + */ + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + public static final String CONSUMER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "consumer.client.id"); + public static final String PRODUCER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "producer.client.id"); + + /** + * http + */ + public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http"); + public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(0, "app.id.http"); + + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java new file mode 100644 index 0000000..a9b38ca --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -0,0 +1,56 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.functions.FilterNullFunction; +import com.zdjizhi.utils.functions.MapCompletedFunction; +import com.zdjizhi.utils.kafka.Consumer; +import com.zdjizhi.utils.kafka.Producer; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * @author qidaijie + * @Package com.zdjizhi.topology + * @Description: + * @date 2021/5/2016:42 + */ +public class LogFlowWriteTopology { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + //开启Checkpoint,interval用于指定checkpoint的触发间隔(单位milliseconds) +// environment.enableCheckpointing(5000); + + DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()) + .setParallelism(FlowWriteConfig.CONSUMER_PARALLELISM); + + if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) { + //对原始日志进行处理补全转换等 + DataStream<String> cleaningLog = streamSource.map(new MapCompletedFunction()) + .name("TransFormLogs").setParallelism(FlowWriteConfig.MAP_PARALLELISM); + //过滤空数据不发送到Kafka内 + DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData"); + //发送数据到Kafka + result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka") + .setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM); + } else { + DataStream<String> result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData"); + result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM); + } + + try { + environment.execute(args[0]); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :" + e); + } + + } + + +} diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java new file mode 100644 index 0000000..0caeb25 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java @@ -0,0 +1,123 @@ +package com.zdjizhi.utils.app; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.http.HttpClientUtil; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * AppId 工具类 + * + * @author qidaijie + */ + +public class AppUtils { + private static final Log logger = LogFactory.get(); + private static Map<Integer, String> appIdMap = new ConcurrentHashMap<>(128); + private static AppUtils appUtils; + + private static void getAppInstance() { + appUtils = new AppUtils(); + } + + + /** + * 构造函数-新 + */ + private AppUtils() { + //定时更新 + updateAppIdCache(); + } + + /** + * 更新变量 + */ + private static void change() { + if (appUtils == null) { + getAppInstance(); + } + timestampsFilter(); + } + + + /** + * 获取变更内容 + */ + private static void timestampsFilter() { + try { + Long begin = System.currentTimeMillis(); + String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP); + if (StringUtil.isNotBlank(schema)) { + String data = JSONObject.parseObject(schema).getString("data"); + JSONArray objects = JSONArray.parseArray(data); + for (Object object : objects) { + JSONArray jsonArray = JSONArray.parseArray(object.toString()); + int key = jsonArray.getInteger(0); + String value = jsonArray.getString(1); + if (appIdMap.containsKey(key)) { + if (!value.equals(appIdMap.get(key))) { + appIdMap.put(key, value); + } + } else { + appIdMap.put(key, value); + } + } + logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis())); + logger.warn("Pull the length of the interface data:[" + objects.size() + "]"); + } + } catch (RuntimeException e) { + logger.error("Update cache app-id failed, exception:" + e); + } + } + + + /** + * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie + */ + private void updateAppIdCache() { + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) { + change(); + } + } catch (RuntimeException e) { + logger.error("AppUtils update AppCache is error===>{" + e + "}<==="); + } + } + }, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); + } + + + /** + * 获取 appName + * + * @param appId app_id + * @return account + */ + public static String getAppName(int appId) { + + if (appUtils == null) { + getAppInstance(); + } + + if (appIdMap.containsKey(appId)) { + return appIdMap.get(appId); + } else { + logger.warn("AppMap get appName is null, ID is :" + appId); + return ""; + } + } + +} diff --git a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java new file mode 100644 index 0000000..67c88f0 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java @@ -0,0 +1,18 @@ +package com.zdjizhi.utils.exception; + +/** + * @author qidaijie + * @Package com.zdjizhi.storm.utils.execption + * @Description: + * @date 2021/3/259:42 + */ +public class FlowWriteException extends RuntimeException { + + public FlowWriteException() { + } + + public FlowWriteException(String message) { + super(message); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java new file mode 100644 index 0000000..de507ad --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java @@ -0,0 +1,17 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class FilterNullFunction implements FilterFunction<String> { + @Override + public boolean filter(String message) { + return StringUtil.isNotBlank(message); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java new file mode 100644 index 0000000..5618159 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java @@ -0,0 +1,28 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.general.TransFormTypeMap; +import org.apache.flink.api.common.functions.MapFunction; + + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class MapCompletedFunction implements MapFunction<String, String> { + private static final Log logger = LogFactory.get(); + + @Override + @SuppressWarnings("unchecked") + public String map(String logs) { + try { + return TransFormTypeMap.dealCommonMessage(logs); + } catch (RuntimeException e) { + logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + logs); + return ""; + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java new file mode 100644 index 0000000..d203a2b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java @@ -0,0 +1,213 @@ +package com.zdjizhi.utils.general; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.zookeeper.DistributedLock; +import com.zdjizhi.utils.zookeeper.ZookeeperUtils; + +/** + * 雪花算法 + * + * @author qidaijie + */ +public class SnowflakeId { + private static final Log logger = LogFactory.get(); + + /** + * 共64位 第一位为符号位 默认0 + * 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :6位(0-63), + * workerId(关联进程):7(0-127) ,序列号:11位(2047/ms) + * + * 序列号 /ms = (-1L ^ (-1L << 11)) + * 最大使用年 = (1L << 39) / (1000L * 60 * 60 * 24 * 365) + */ + /** + * 开始时间截 (2020-11-14 00:00:00) max 17years + */ + private final long twepoch = 1605283200000L; + + /** + * 机器id所占的位数 + */ + private final long workerIdBits = 7L; + + /** + * 数据标识id所占的位数 + */ + private final long dataCenterIdBits = 6L; + + /** + * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) + * M << n = M * 2^n + */ + private final long maxWorkerId = -1L ^ (-1L << workerIdBits); + + /** + * 支持的最大数据标识id,结果是127 + */ + private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits); + + /** + * 序列在id中占的位数 + */ + private final long sequenceBits = 11L; + + /** + * 机器ID向左移12位 + */ + private final long workerIdShift = sequenceBits; + + /** + * 数据标识id向左移17位(14+6) + */ + private final long dataCenterIdShift = sequenceBits + workerIdBits; + + /** + * 时间截向左移22位(4+6+14) + */ + private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits; + + /** + * 生成序列的掩码,这里为2047 + */ + private final long sequenceMask = -1L ^ (-1L << sequenceBits); + + /** + * 工作机器ID(0~127) + */ + private long workerId; + + /** + * 数据中心ID(0~63) + */ + private long dataCenterId; + + /** + * 毫秒内序列(0~2047) + */ + private long sequence = 0L; + + /** + * 上次生成ID的时间截 + */ + private long lastTimestamp = -1L; + + + /** + * 设置允许时间回拨的最大限制10s + */ + private static final long rollBackTime = 10000L; + + + private static SnowflakeId idWorker; + + private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); + + static { + idWorker = new SnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM); + } + + //==============================Constructors===================================== + + /** + * 构造函数 + */ + private SnowflakeId(String zookeeperIp, long dataCenterIdNum) { + DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1"); + try { + lock.lock(); + int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp); + if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { + throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); + } + if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) { + throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId)); + } + this.workerId = tmpWorkerId; + this.dataCenterId = dataCenterIdNum; + } catch (RuntimeException e) { + logger.error("This is not usual error!!!===>>>" + e + "<<<==="); + }finally { + lock.unlock(); + } + } + + // ==============================Methods========================================== + + /** + * 获得下一个ID (该方法是线程安全的) + * + * @return SnowflakeId + */ + private synchronized long nextId() { + long timestamp = timeGen(); + //设置一个允许回拨限制时间,系统时间回拨范围在rollBackTime内可以等待校准 + if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) { + timestamp = tilNextMillis(lastTimestamp); + } + //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 + if (timestamp < lastTimestamp) { + throw new RuntimeException( + String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); + } + + //如果是同一时间生成的,则进行毫秒内序列 + if (lastTimestamp == timestamp) { + sequence = (sequence + 1) & sequenceMask; + //毫秒内序列溢出 + if (sequence == 0) { + //阻塞到下一个毫秒,获得新的时间戳 + timestamp = tilNextMillis(lastTimestamp); + } + } + //时间戳改变,毫秒内序列重置 + else { + sequence = 0L; + } + + //上次生成ID的时间截 + lastTimestamp = timestamp; + + //移位并通过或运算拼到一起组成64位的ID + return ((timestamp - twepoch) << timestampLeftShift) + | (dataCenterId << dataCenterIdShift) + | (workerId << workerIdShift) + | sequence; + } + + /** + * 阻塞到下一个毫秒,直到获得新的时间戳 + * + * @param lastTimestamp 上次生成ID的时间截 + * @return 当前时间戳 + */ + protected long tilNextMillis(long lastTimestamp) { + long timestamp = timeGen(); + while (timestamp <= lastTimestamp) { + timestamp = timeGen(); + } + return timestamp; + } + + /** + * 返回以毫秒为单位的当前时间 + * + * @return 当前时间(毫秒) + */ + protected long timeGen() { + return System.currentTimeMillis(); + } + + + /** + * 静态工具类 + * + * @return + */ + public static Long generateId() { + return idWorker.nextId(); + } + + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java new file mode 100644 index 0000000..f67b842 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java @@ -0,0 +1,144 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; + +import java.util.ArrayList; +import java.util.Map; + + +/** + * 描述:转换或补全工具类 + * + * @author qidaijie + */ +public class TransFormMap { + private static final Log logger = LogFactory.get(); + + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 解析日志,并补全 + * + * @param message kafka Topic原始日志 + * @return 补全后的日志 + */ + @SuppressWarnings("unchecked") + public static String dealCommonMessage(String message) { + try { + if (StringUtil.isNotBlank(message)) { + Map<String, Object> jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); + for (String[] strings : jobList) { + //用到的参数的值 + Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); + //需要补全的字段的key + String appendToKeyName = strings[1]; + //需要补全的字段的值 + Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName); + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param); + } + return JsonMapper.toJsonString(jsonMap); + } else { + return ""; + } + } catch (RuntimeException e) { + logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message); + return ""; + } + } + + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param function 匹配操作函数的字段 + * @param jsonMap 原始日志解析map + * @param appendToKeyName 需要补全的字段的key + * @param appendTo 需要补全的字段的值 + * @param logValue 用到的参数的值 + * @param param 额外的参数的值 + */ + private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) { + switch (function) { + case "current_timestamp": + if (!(appendTo instanceof Long)) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); + break; + case "geo_ip_detail": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); + } + break; + case "geo_asn": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString())); + } + break; + case "geo_ip_country": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString())); + } + break; + case "set_value": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, param); + } + break; + case "get_value": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param)); + } + break; + case "sub_domain": + if (appendTo == null && logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString())); + } + break; + case "radius_match": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString())); + } + break; + case "app_match": + if (logValue != null && appendTo == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); + } + break; + case "decode_of_base64": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param))); + } + break; + case "flattenSpec": + if (logValue != null && param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); + } + break; + default: + } + } + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java new file mode 100644 index 0000000..26795b0 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java @@ -0,0 +1,153 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; + +import java.util.ArrayList; +import java.util.HashMap; + + +/** + * 描述:转换或补全工具类 + * + * @author qidaijie + */ +public class TransFormObject { + private static final Log logger = LogFactory.get(); + + /** + * 在内存中加载反射类用的map + */ + private static HashMap<String, Class> map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 反射成一个类 + */ + private static Object mapObject = JsonParseUtil.generateObject(map); + + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 解析日志,并补全 + * + * @param message kafka Topic原始日志 + * @return 补全后的日志 + */ + public static String dealCommonMessage(String message) { + try { + if (StringUtil.isNotBlank(message)) { + Object object = JsonMapper.fromJsonString(message, mapObject.getClass()); + for (String[] strings : jobList) { + //用到的参数的值 + Object name = JsonParseUtil.getValue(object, strings[0]); + //需要补全的字段的key + String appendToKeyName = strings[1]; + //需要补全的字段的值 + Object appendTo = JsonParseUtil.getValue(object, appendToKeyName); + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + functionSet(function, object, appendToKeyName, appendTo, name, param); + } + return JsonMapper.toJsonString(object); + } else { + return ""; + } + } catch (RuntimeException e) { + logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message); + return ""; + } + } + + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param function 匹配操作函数的字段 + * @param object 动态POJO Object + * @param appendToKeyName 需要补全的字段的key + * @param appendTo 需要补全的字段的值 + * @param name 用到的参数的值 + * @param param 额外的参数的值 + */ + private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) { + switch (function) { + case "current_timestamp": + if (!(appendTo instanceof Long)) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId()); + break; + case "geo_ip_detail": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(name.toString())); + } + break; + case "geo_asn": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(name.toString())); + } + break; + case "geo_ip_country": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(name.toString())); + } + break; + case "set_value": + if (name != null && param != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.setValue(param)); + } + break; + case "get_value": + if (name != null) { + JsonParseUtil.setValue(object, appendToKeyName, name); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.condition(object, param)); + } + break; + case "sub_domain": + if (appendTo == null && name != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getTopDomain(name.toString())); + } + break; + case "radius_match": + if (name != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString())); + } + break; + case "app_match": + if (name != null && appendTo == null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString())); + } + break; + case "decode_of_base64": + if (name != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param))); + } + break; + case "flattenSpec": + if (name != null && param != null) { + JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), param)); + } + break; + default: + } + } + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java new file mode 100644 index 0000000..7779da2 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java @@ -0,0 +1,145 @@ +package com.zdjizhi.utils.general; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.json.JsonTypeUtils; + +import java.util.ArrayList; +import java.util.Map; + + +/** + * 描述:转换或补全工具类 + * + * @author qidaijie + */ +public class TransFormTypeMap { + private static final Log logger = LogFactory.get(); + + /** + * 获取任务列表 + * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: + * (mail_subject mail_subject decode_of_base64 mail_subject_charset) + */ + private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 解析日志,并补全 + * + * @param message kafka Topic原始日志 + * @return 补全后的日志 + */ + @SuppressWarnings("unchecked") + public static String dealCommonMessage(String message) { + try { + if (StringUtil.isNotBlank(message)) { + Map<String, Object> jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); + for (String[] strings : jobList) { + //用到的参数的值 + Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); + //需要补全的字段的key + String appendToKeyName = strings[1]; + //需要补全的字段的值 + Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName); + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param); + } + return JsonMapper.toJsonString(JsonTypeUtils.typeTransform(jsonMap)); + } else { + return ""; + } + } catch (RuntimeException e) { + logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message); + return ""; + } + } + + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param function 匹配操作函数的字段 + * @param jsonMap 原始日志解析map + * @param appendToKeyName 需要补全的字段的key + * @param appendToKeyValue 需要补全的字段的值 + * @param logValue 用到的参数的值 + * @param param 额外的参数的值 + */ + private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) { + switch (function) { + case "current_timestamp": + if (!(appendToKeyValue instanceof Long)) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime()); + } + break; + case "snowflake_id": + JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId()); + break; + case "geo_ip_detail": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString())); + } + break; + case "geo_asn": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString())); + } + break; + case "geo_ip_country": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString())); + } + break; + case "set_value": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, param); + } + break; + case "get_value": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue); + } + break; + case "if": + if (param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param)); + } + break; + case "sub_domain": + if (appendToKeyValue == null && logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString())); + } + break; + case "radius_match": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString())); + } + break; + case "app_match": + if (logValue != null && appendToKeyValue == null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString())); + } + break; + case "decode_of_base64": + if (logValue != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param))); + } + break; + case "flattenSpec": + if (logValue != null && param != null) { + JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param)); + } + break; + default: + } + } + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java new file mode 100644 index 0000000..9fada7b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java @@ -0,0 +1,317 @@ +package com.zdjizhi.utils.general; + +import cn.hutool.core.codec.Base64; +import cn.hutool.core.text.StrSpliter; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.jayway.jsonpath.InvalidPathException; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.DefaultProConfig; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.FormatUtils; +import com.zdjizhi.utils.IpLookup; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.app.AppUtils; +import com.zdjizhi.utils.hbase.HBaseUtils; +import com.zdjizhi.utils.json.JsonParseUtil; +import com.zdjizhi.utils.json.TypeUtils; + +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author qidaijie + */ +class TransFunction { + + private static final Log logger = LogFactory.get(); + + private static final Pattern PATTERN = Pattern.compile("[0-9]*"); + + /** + * IP定位库工具类 + */ + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb") + .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "ip_v6.mmdb") + .loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_private_v4.mmdb") + .loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_private_v6.mmdb") + .loadAsnDataFile(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb") + .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb") + .build(); + + /** + * 生成当前时间戳的操作 + */ + static long getCurrentTime() { + + return System.currentTimeMillis() / 1000; + } + + /** + * 根据clientIp获取location信息 + * + * @param ip client IP + * @return ip地址详细信息 + */ + static String getGeoIpDetail(String ip) { + + return ipLookup.cityLookupDetail(ip); + + } + + /** + * 根据ip获取asn信息 + * + * @param ip client/server IP + * @return ASN + */ + static String getGeoAsn(String ip) { + + return ipLookup.asnLookup(ip); + } + + /** + * 根据ip获取country信息 + * + * @param ip server IP + * @return 国家 + */ + static String getGeoIpCountry(String ip) { + + return ipLookup.countryLookup(ip); + } + + + /** + * radius借助HBase补齐 + * + * @param ip client IP + * @return account + */ + static String radiusMatch(String ip) { + String account = HBaseUtils.getAccount(ip.trim()); + if (StringUtil.isBlank(account)) { + logger.warn("HashMap get account is null, Ip is :" + ip); + } + return account; + } + + /** + * appId与缓存中对应关系补全appName + * + * @param appIds app id 列表 + * @return appName + */ + static String appMatch(String appIds) { + try { + String appId = StrSpliter.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0); + return AppUtils.getAppName(Integer.parseInt(appId)); + } catch (NumberFormatException | ClassCastException exception) { + logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds); + return ""; + } + } + + /** + * 解析顶级域名 + * + * @param domain 初始域名 + * @return 顶级域名 + */ + static String getTopDomain(String domain) { + try { + return FormatUtils.getTopPrivateDomain(domain); + } catch (StringIndexOutOfBoundsException outException) { + logger.error("解析顶级域名异常,异常域名:" + domain); + return ""; + } + } + + /** + * 根据编码解码base64 + * + * @param message base64 + * @param charset 编码 + * @return 解码字符串 + */ + static String decodeBase64(String message, Object charset) { + String result = ""; + try { + if (StringUtil.isNotBlank(message)) { + if (charset == null) { + result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET); + } else { + result = Base64.decodeStr(message, charset.toString()); + } + } + } catch (RuntimeException rune) { + logger.error("解析 Base64 异常,异常信息:" + rune); + } + return result; + } + + /** + * 根据表达式解析json + * + * @param message json + * @param expr 解析表达式 + * @return 解析结果 + */ + static String flattenSpec(String message, String expr) { + String flattenResult = ""; + try { + if (StringUtil.isNotBlank(expr)) { + ArrayList<String> read = JsonPath.parse(message).read(expr); + flattenResult = read.get(0); + } + } catch (ClassCastException | InvalidPathException e) { + logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e); + } + return flattenResult; + } + + + /** + * 判断是否为日志字段,是则返回对应value,否则返回原始字符串 + * + * @param object 内存实体类 + * @param param 字段名/普通字符串 + * @return JSON.Value or String + */ + static Object isJsonValue(Object object, String param) { + if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) { + return JsonParseUtil.getValue(object, param.substring(2)); + } else { + return param; + } + } + + /** + * 判断是否为日志字段,是则返回对应value,否则返回原始字符串 + * + * @param jsonMap 内存实体类 + * @param param 字段名/普通字符串 + * @return JSON.Value or String + */ + static Object isJsonValue(Map<String, Object> jsonMap, String param) { + if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) { + return JsonParseUtil.getValue(jsonMap, param.substring(2)); + } else { + return param; + } + } + + /** + * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 + * + * @param object 内存实体类 + * @param ifParam 字段名/普通字符串 + * @return resultA or resultB or null + */ + static Object condition(Object object, String ifParam) { + Object result = null; + try { + String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); + if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { + String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); + Object direction = isJsonValue(object, norms[0]); + Object resultA = isJsonValue(object, split[1]); + Object resultB = isJsonValue(object, split[2]); + if (direction instanceof Number) { +// result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB; + result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB); + } else if (direction instanceof String) { + result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB); +// result = direction.equals(norms[1]) ? resultA : resultB; + } + } + } catch (RuntimeException e) { + logger.error("IF 函数执行异常,异常信息:" + e); + } + return result; + } + + /** + * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 + * + * @param jsonMap 内存实体类 + * @param ifParam 字段名/普通字符串 + * @return resultA or resultB or null + */ + static Object condition(Map<String, Object> jsonMap, String ifParam) { + Object result = null; + try { + String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); + if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { + String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); + Object direction = isJsonValue(jsonMap, norms[0]); + Object resultA = isJsonValue(jsonMap, split[1]); + Object resultB = isJsonValue(jsonMap, split[2]); + if (direction instanceof Number) { + result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB; +// result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB); + } else if (direction instanceof String) { +// result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB); + result = direction.equals(norms[1]) ? resultA : resultB; + } + } + } catch (RuntimeException e) { + logger.error("IF 函数执行异常,异常信息:" + e); + } + return result; + } + +// /** +// * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。 +// * +// * @param jsonMap 原始日志 +// * @param ifParam 字段名/普通字符串 +// * @return resultA or resultB or null +// */ +// static Object condition(Map<String, Object> jsonMap, String ifParam) { +// try { +// String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER); +// String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER); +// String direction = isJsonValue(jsonMap, norms[0]); +// if (StringUtil.isNotBlank(direction)) { +// if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) { +// String resultA = isJsonValue(jsonMap, split[1]); +// String resultB = isJsonValue(jsonMap, split[2]); +// String result = (Integer.parseInt(direction) == Integer.parseInt(norms[1])) ? resultA : resultB; +// Matcher isNum = PATTERN.matcher(result); +// if (isNum.matches()) { +// return Long.parseLong(result); +// } else { +// return result; +// } +// } +// } +// } catch (RuntimeException e) { +// logger.error("IF 函数执行异常,异常信息:" + e); +// } +// return null; +// } + + /** + * 设置固定值函数 若为数字则转为long返回 + * + * @param param 默认值 + * @return 返回数字或字符串 + */ + static Object setValue(String param) { + try { + Matcher isNum = PATTERN.matcher(param); + if (isNum.matches()) { + return Long.parseLong(param); + } else { + return param; + } + } catch (RuntimeException e) { + logger.error("SetValue 函数异常,异常信息:" + e); + } + return null; + } +} diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java new file mode 100644 index 0000000..60b3d09 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java @@ -0,0 +1,202 @@ +package com.zdjizhi.utils.hbase; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.DefaultProConfig; +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * HBase 工具类 + * + * @author qidaijie + */ + +public class HBaseUtils { + private static final Log logger = LogFactory.get(); + private static Map<String, String> subIdMap = new ConcurrentHashMap<>(83334); + private static Connection connection; + private static Long time; + + private static String zookeeperIp; + private static String hBaseTable; + + private static HBaseUtils hBaseUtils; + + private static void getInstance() { + hBaseUtils = new HBaseUtils(); + } + + + /** + * 构造函数-新 + */ + private HBaseUtils() { + zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS; + hBaseTable = DefaultProConfig.HBASE_TABLE_NAME; + //获取连接 + getConnection(); + //拉取所有 + getAll(); + //定时更新 + updateCache(); + } + + private static void getConnection() { + try { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", zookeeperIp); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + connection = ConnectionFactory.createConnection(configuration); + time = System.currentTimeMillis(); + logger.warn("HBaseUtils get HBase connection,now to getAll()."); + } catch (IOException ioe) { + logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<==="); + } + } + + /** + * 更新变量 + */ + private static void change() { + if (hBaseUtils == null) { + getInstance(); + } + long nowTime = System.currentTimeMillis(); + timestampsFilter(time - 1000, nowTime + 500); + } + + + /** + * 获取变更内容 + * + * @param startTime 开始时间 + * @param endTime 结束时间 + */ + private static void timestampsFilter(Long startTime, Long endTime) { + Long begin = System.currentTimeMillis(); + Table table = null; + ResultScanner scanner = null; + Scan scan2 = new Scan(); + try { + table = connection.getTable(TableName.valueOf("sub:" + hBaseTable)); + scan2.setTimeRange(startTime, endTime); + scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + String key = Bytes.toString(CellUtil.cloneRow(cell)).trim(); + String value = Bytes.toString(CellUtil.cloneValue(cell)).trim(); + if (subIdMap.containsKey(key)) { + if (!value.equals(subIdMap.get(key))) { + subIdMap.put(key, value); + } + } else { + subIdMap.put(key, value); + } + } + } + Long end = System.currentTimeMillis(); + logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size()); + logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + startTime + ",EndTime: " + endTime); + time = endTime; + } catch (IOException ioe) { + logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<==="); + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + try { + table.close(); + } catch (IOException e) { + logger.error("HBase Table Close ERROR! Exception message is:" + e); + } + } + } + } + + /** + * 获取所有的 key value + */ + private static void getAll() { + long begin = System.currentTimeMillis(); + try { + Table table = connection.getTable(TableName.valueOf("sub:" + hBaseTable)); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } + logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size()); + logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin)); + scanner.close(); + } catch (IOException ioe) { + logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<==="); + } catch (RuntimeException e) { + logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<==="); + } + } + + /** + * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie + */ + private void updateCache() { +// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, +// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build()); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) { + change(); + } + } catch (RuntimeException e) { + logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<==="); + } + } + }, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS); + } + + + /** + * 获取 account + * + * @param clientIp client_ip + * @return account + */ + public static String getAccount(String clientIp) { + + if (hBaseUtils == null) { + getInstance(); + } + return subIdMap.get(clientIp); + + } + +} diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java new file mode 100644 index 0000000..1adb1d1 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java @@ -0,0 +1,77 @@ +package com.zdjizhi.utils.http; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +/** + * 获取网关schema的工具类 + * + * @author qidaijie + */ +public class HttpClientUtil { + private static final Log logger = LogFactory.get(); + + /** + * 请求网关获取schema + * + * @param http 网关url + * @return schema + */ + public static String requestByGetMethod(String http) { + CloseableHttpClient httpClient = HttpClients.createDefault(); + StringBuilder entityStringBuilder; + + HttpGet get = new HttpGet(http); + BufferedReader bufferedReader = null; + CloseableHttpResponse httpResponse = null; + try { + httpResponse = httpClient.execute(get); + HttpEntity entity = httpResponse.getEntity(); + entityStringBuilder = new StringBuilder(); + if (null != entity) { + bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024); + int intC; + while ((intC = bufferedReader.read()) != -1) { + char c = (char) intC; + if (c == '\n') { + break; + } + entityStringBuilder.append(c); + } + + return entityStringBuilder.toString(); + } + } catch (IOException e) { + logger.error("Get Schema from Query engine ERROR! Exception message is:" + e); + } finally { + if (httpClient != null) { + try { + httpClient.close(); + } catch (IOException e) { + logger.error("Close HTTP Client ERROR! Exception messgae is:" + e); + } + } + if (httpResponse != null) { + try { + httpResponse.close(); + } catch (IOException e) { + logger.error("Close httpResponse ERROR! Exception messgae is:" + e); + } + } + if (bufferedReader != null) { + IOUtils.closeQuietly(bufferedReader); + } + } + return ""; + } +} diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java new file mode 100644 index 0000000..bdcc43d --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -0,0 +1,283 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.http.HttpClientUtil; +import net.sf.cglib.beans.BeanGenerator; +import net.sf.cglib.beans.BeanMap; + +import java.util.*; + +/** + * 使用FastJson解析json的工具类 + * + * @author qidaijie + */ +public class JsonParseUtil { + + private static final Log logger = LogFactory.get(); + + private static ArrayList<String> dropList = new ArrayList<>(); + + /** + * 模式匹配,给定一个类型字符串返回一个类类型 + * + * @param type 类型 + * @return 类类型 + */ + + private static Class getClassName(String type) { + Class clazz; + + switch (type) { + case "int": + clazz = Integer.class; + break; + case "string": + clazz = String.class; + break; + case "long": + clazz = long.class; + break; + case "array": + clazz = List.class; + break; + case "double": + clazz = double.class; + break; + case "float": + clazz = float.class; + break; + case "char": + clazz = char.class; + break; + case "byte": + clazz = byte.class; + break; + case "boolean": + clazz = boolean.class; + break; + case "short": + clazz = short.class; + break; + default: + clazz = String.class; + } + return clazz; + } + + /** + * 获取属性值的方法 + * + * @param obj 对象 + * @param property key + * @return 属性的值 + */ + public static Object getValue(Object obj, String property) { + try { + BeanMap beanMap = BeanMap.create(obj); + return beanMap.get(property); + } catch (RuntimeException e) { + logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + return null; + } + } + + /** + * 获取属性值的方法 + * + * @param jsonMap 原始日志 + * @param property key + * @return 属性的值 + */ + public static Object getValue(Map<String, Object> jsonMap, String property) { + try { + return jsonMap.getOrDefault(property, null); + } catch (RuntimeException e) { + logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e); + return null; + } + } + + /** + * 更新属性值的方法 + * + * @param jsonMap 原始日志json map + * @param property 更新的key + * @param value 更新的值 + */ + public static void setValue(Map<String, Object> jsonMap, String property, Object value) { + try { + jsonMap.put(property, value); + } catch (RuntimeException e) { + logger.error("赋予实体类错误类型数据", e); + } + } + + /** + * 更新属性值的方法 + * + * @param obj 对象 + * @param property 更新的key + * @param value 更新的值 + */ + public static void setValue(Object obj, String property, Object value) { + try { + BeanMap beanMap = BeanMap.create(obj); + beanMap.put(property, value); + } catch (ClassCastException e) { + logger.error("赋予实体类错误类型数据", e); + } + } + + /** + * 根据反射生成对象的方法 + * + * @param properties 反射类用的map + * @return 生成的Object类型的对象 + */ + public static Object generateObject(Map properties) { + BeanGenerator generator = new BeanGenerator(); + Set keySet = properties.keySet(); + for (Object aKeySet : keySet) { + String key = (String) aKeySet; + generator.addProperty(key, (Class) properties.get(key)); + } + return generator.create(); + } + + /** + * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 + * + * @param http 网关schema地址 + * @return 用于反射生成schema类型的对象的一个map集合 + */ + public static HashMap<String, Class> getMapFromHttp(String http) { + HashMap<String, Class> map = new HashMap<>(16); + + String schema = HttpClientUtil.requestByGetMethod(http); + Object data = JSON.parseObject(schema).get("data"); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + String filedStr = field.toString(); + if (checkKeepField(filedStr)) { + String name = JsonPath.read(filedStr, "$.name").toString(); + String type = JsonPath.read(filedStr, "$.type").toString(); + if (type.contains("{")) { + type = JsonPath.read(filedStr, "$.type.type").toString(); + } + //组合用来生成实体类的map + map.put(name, getClassName(type)); + } else { + dropList.add(filedStr); + } + } + return map; + } + + /** + * 判断字段是否需要保留 + * + * @param message 单个field-json + * @return true or false + */ + private static boolean checkKeepField(String message) { + boolean isKeepField = true; + boolean isHiveDoc = JSON.parseObject(message).containsKey("doc"); + if (isHiveDoc) { + boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility"); + if (isHiveVi) { + String visibility = JsonPath.read(message, "$.doc.visibility").toString(); + if (FlowWriteConfig.VISIBILITY.equals(visibility)) { + isKeepField = false; + } + } + } + return isKeepField; + } + + static void dropJsonField(Map<String, Object> jsonMap) { + for (String field : dropList) { + jsonMap.remove(field); + } + } + + /** + * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) + * + * @param http 网关url + * @return 任务列表 + */ + public static ArrayList<String[]> getJobListFromHttp(String http) { + ArrayList<String[]> list = new ArrayList<>(); + + String schema = HttpClientUtil.requestByGetMethod(http); + //解析data + Object data = JSON.parseObject(schema).get("data"); + + //获取fields,并转化为数组,数组的每个元素都是一个name doc type + JSONObject schemaJson = JSON.parseObject(data.toString()); + JSONArray fields = (JSONArray) schemaJson.get("fields"); + + for (Object field : fields) { + + if (JSON.parseObject(field.toString()).containsKey("doc")) { + Object doc = JSON.parseObject(field.toString()).get("doc"); + + if (JSON.parseObject(doc.toString()).containsKey("format")) { + String name = JSON.parseObject(field.toString()).get("name").toString(); + Object format = JSON.parseObject(doc.toString()).get("format"); + JSONObject formatObject = JSON.parseObject(format.toString()); + + String functions = formatObject.get("functions").toString(); + String appendTo = null; + String params = null; + + if (formatObject.containsKey("appendTo")) { + appendTo = formatObject.get("appendTo").toString(); + } + + if (formatObject.containsKey("param")) { + params = formatObject.get("param").toString(); + } + + + if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], null}); + } + + } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) { + String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER); + String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER); + + for (int i = 0; i < functionArray.length; i++) { + list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]}); + + } + } else { + list.add(new String[]{name, name, functions, params}); + } + + } + } + + } + return list; + } + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java new file mode 100644 index 0000000..0b6bc1e --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java @@ -0,0 +1,187 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.exception.FlowWriteException; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1217:34 + */ +public class JsonTypeUtils { + private static final Log logger = LogFactory.get(); + /** + * 在内存中加载反射类用的map + */ + private static HashMap<String, Class> map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP); + + /** + * 类型转换 + * + * @param jsonMap 原始日志map + */ + public static Map<String, Object> typeTransform(Map<String, Object> jsonMap) throws RuntimeException { + JsonParseUtil.dropJsonField(jsonMap); + HashMap<String, Object> tmpMap = new HashMap<>(192); + for (String key : jsonMap.keySet()) { + if (map.containsKey(key)) { + String simpleName = map.get(key).getSimpleName(); + switch (simpleName) { + case "String": + tmpMap.put(key, checkString(jsonMap.get(key))); + break; + case "Integer": + tmpMap.put(key, getIntValue(jsonMap.get(key))); + break; + case "long": + tmpMap.put(key, checkLongValue(jsonMap.get(key))); + break; + case "List": + tmpMap.put(key, checkArray(jsonMap.get(key))); + break; + case "Map": + tmpMap.put(key, checkObject(jsonMap.get(key))); + break; + case "double": + tmpMap.put(key, checkDouble(jsonMap.get(key))); + break; + default: + tmpMap.put(key, checkString(jsonMap.get(key))); + } + } + } + return tmpMap; + } + + /** + * String 类型检验转换方法 + * + * @param value json value + * @return String value + */ + private static String checkString(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map){ + return JsonMapper.toJsonString(value); + } + + if (value instanceof List){ + return JsonMapper.toJsonString(value); + } + + return value.toString(); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + private static Map checkObject(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + return (Map) value; + } + + throw new FlowWriteException("can not cast to map, value : " + value); + } + + /** + * array 类型检验转换方法 + * + * @param value json value + * @return List value + */ + private static List checkArray(Object value) { + if (value == null) { + return null; + } + + if (value instanceof List) { + return (List) value; + } + + throw new FlowWriteException("can not cast to List, value : " + value); + } + + private static Long checkLong(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToLong(value); + } + + /** + * long 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return Long value + */ + private static long checkLongValue(Object value) { + Long longVal = TypeUtils.castToLong(value); + if (longVal == null) { + return 0L; + } + +// return longVal.longValue(); + return longVal; + } + + /** + * Double 类型校验转换方法 + * + * @param value json value + * @return Double value + */ + private static Double checkDouble(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToDouble(value); + } + + + private static Integer checkInt(Object value) { + if (value == null) { + return null; + } + + return TypeUtils.castToInt(value); + } + + + /** + * int 类型检验转换方法,若为空返回基础值 + * + * @param value json value + * @return int value + */ + private static int getIntValue(Object value) { + + Integer intVal = TypeUtils.castToInt(value); + if (intVal == null) { + return 0; + } + +// return intVal.intValue(); + return intVal; + } + +} diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java new file mode 100644 index 0000000..b13627f --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java @@ -0,0 +1,171 @@ +package com.zdjizhi.utils.json; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.exception.FlowWriteException; + +/** + * @author qidaijie + * @Package PACKAGE_NAME + * @Description: + * @date 2021/7/1218:20 + */ +public class TypeUtils { + private static final Log logger = LogFactory.get(); + + /** + * Integer 类型判断方法 + * + * @param value json value + * @return Integer value or null + */ + public static Object castToIfFunction(Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return value.toString(); + } + + if (value instanceof Integer) { + return ((Number) value).intValue(); + } + + if (value instanceof Long) { + return ((Number) value).longValue(); + } + +// if (value instanceof Map) { +// return (Map) value; +// } +// +// if (value instanceof List) { +// return Collections.singletonList(value.toString()); +// } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new FlowWriteException("can not cast to int, value : " + value); + } + + /** + * Integer 类型判断方法 + * + * @param value json value + * @return Integer value or null + */ + static Integer castToInt(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Integer) { + return (Integer) value; + } + + if (value instanceof Number) { + return ((Number) value).intValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Integer.parseInt(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Integer Error,The error Str is:" + strVal); + } + } + + if (value instanceof Boolean) { + return (Boolean) value ? 1 : 0; + } + + throw new FlowWriteException("can not cast to int, value : " + value); + } + + /** + * Double类型判断方法 + * + * @param value json value + * @return double value or null + */ + static Double castToDouble(Object value) { + + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Double.parseDouble(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Double Error,The error Str is:" + strVal); + } + } + + throw new FlowWriteException("can not cast to double, value : " + value); + } + + /** + * Long类型判断方法 + * + * @param value json value + * @return (Long)value or null + */ + static Long castToLong(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + String strVal = (String) value; + + if (StringUtil.isBlank(strVal)) { + return null; + } + + //将 10,20 类数据转换为10 + if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) { + strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0]; + } + + try { + return Long.parseLong(strVal); + } catch (NumberFormatException ex) { + logger.error("String change Long Error,The error Str is:" + strVal); + } + } + + throw new FlowWriteException("can not cast to long, value : " + value); + } + +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java new file mode 100644 index 0000000..c220064 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -0,0 +1,44 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; + + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/813:54 + */ +public class Consumer { + private static Properties createConsumerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", FlowWriteConfig.INPUT_KAFKA_SERVERS); + properties.put("group.id", FlowWriteConfig.GROUP_ID); + properties.put("session.timeout.ms", "60000"); + properties.put("max.poll.records", "3000"); + properties.put("max.partition.fetch.bytes", "31457280"); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + /* + * kafka限流配置-20201117 + */ +// properties.put(ConsumerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.CONSUMER_CLIENT_ID); + return properties; + } + + public static FlinkKafkaConsumer<String> getKafkaConsumer() { + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.INPUT_KAFKA_TOPIC, + new SimpleStringSchema(), createConsumerConfig()); + + kafkaConsumer.setCommitOffsetsOnCheckpoints(false); + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java new file mode 100644 index 0000000..077ae71 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java @@ -0,0 +1,53 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.DefaultProConfig; +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/814:04 + */ +public class Producer { + + private static Properties createProducerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", FlowWriteConfig.OUTPUT_KAFKA_SERVERS); +// properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); +// properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", FlowWriteConfig.PRODUCER_ACK); + properties.put("retries", DefaultProConfig.RETRIES); + properties.put("linger.ms", DefaultProConfig.LINGER_MS); + properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", DefaultProConfig.BATCH_SIZE); + properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY); + properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE); + + /** + * kafka限流配置-20201117 + */ +// properties.put(ProducerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.PRODUCER_CLIENT_ID); +// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + return properties; + } + + + public static FlinkKafkaProducer<String> getKafkaProducer() { + FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( + FlowWriteConfig.OUTPUT_KAFKA_TOPIC, + new SimpleStringSchema(), + createProducerConfig()); + + kafkaProducer.setLogFailuresOnly(false); +// kafkaProducer.setWriteTimestampToKafka(true); + + return kafkaProducer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java new file mode 100644 index 0000000..08fa29b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java @@ -0,0 +1,70 @@ +package com.zdjizhi.utils.system; + +import com.zdjizhi.utils.StringUtil; + +import java.io.IOException; +import java.util.Locale; +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class FlowWriteConfigurations { + + private static Properties propKafka = new Properties(); + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else if (type == 1) { + return propKafka.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else if (type == 1) { + return Integer.parseInt(propKafka.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else if (type == 1) { + return Long.parseLong(propKafka.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else if (type == 1) { + return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else { + return null; + } + } + + static { + try { + propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + } catch (IOException | RuntimeException e) { + propKafka = null; + propService = null; + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java new file mode 100644 index 0000000..2afab03 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java @@ -0,0 +1,190 @@ +package com.zdjizhi.utils.zookeeper; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +/** + * @author qidaijie + */ +public class DistributedLock implements Lock, Watcher { + private static final Log logger = LogFactory.get(); + + private ZooKeeper zk = null; + /** + * 根节点 + */ + private final String ROOT_LOCK = "/locks"; + /** + * 竞争的资源 + */ + private String lockName; + /** + * 等待的前一个锁 + */ + private String waitLock; + /** + * 当前锁 + */ + private String currentLock; + /** + * 计数器 + */ + private CountDownLatch countDownLatch; + + private int sessionTimeout = 2000; + + private List<Exception> exceptionList = new ArrayList<Exception>(); + + /** + * 配置分布式锁 + * + * @param config 连接的url + * @param lockName 竞争资源 + */ + public DistributedLock(String config, String lockName) { + this.lockName = lockName; + try { + // 连接zookeeper + zk = new ZooKeeper(config, sessionTimeout, this); + Stat stat = zk.exists(ROOT_LOCK, false); + if (stat == null) { + // 如果根节点不存在,则创建根节点 + zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (IOException | InterruptedException | KeeperException e) { + logger.error("Node already exists!"); + } + } + + // 节点监视器 + @Override + public void process(WatchedEvent event) { + if (this.countDownLatch != null) { + this.countDownLatch.countDown(); + } + } + + @Override + public void lock() { + if (exceptionList.size() > 0) { + throw new LockException(exceptionList.get(0)); + } + try { + if (this.tryLock()) { + logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁"); + } else { + // 等待锁 + waitForLock(waitLock, sessionTimeout); + } + } catch (InterruptedException | KeeperException e) { + logger.error("获取锁异常" + e); + } + } + + @Override + public boolean tryLock() { + try { + String splitStr = "_lock_"; + if (lockName.contains(splitStr)) { + throw new LockException("锁名有误"); + } + // 创建临时有序节点 + currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + // 取所有子节点 + List<String> subNodes = zk.getChildren(ROOT_LOCK, false); + // 取出所有lockName的锁 + List<String> lockObjects = new ArrayList<String>(); + for (String node : subNodes) { + String tmpNode = node.split(splitStr)[0]; + if (tmpNode.equals(lockName)) { + lockObjects.add(node); + } + } + Collections.sort(lockObjects); + // 若当前节点为最小节点,则获取锁成功 + if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) { + return true; + } + // 若不是最小节点,则找到自己的前一个节点 + String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1); + waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1); + } catch (InterruptedException | KeeperException e) { + logger.error("获取锁过程异常" + e); + } + return false; + } + + + @Override + public boolean tryLock(long timeout, TimeUnit unit) { + try { + if (this.tryLock()) { + return true; + } + return waitForLock(waitLock, timeout); + } catch (KeeperException | InterruptedException | RuntimeException e) { + logger.error("判断是否锁定异常" + e); + } + return false; + } + + // 等待锁 + private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException { + Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true); + + if (stat != null) { + this.countDownLatch = new CountDownLatch(1); + // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁 + this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); + this.countDownLatch = null; + } + return true; + } + + @Override + public void unlock() { + try { + zk.delete(currentLock, -1); + currentLock = null; + zk.close(); + } catch (InterruptedException | KeeperException e) { + logger.error("关闭锁异常" + e); + } + } + + @Override + public Condition newCondition() { + return null; + } + + @Override + public void lockInterruptibly() throws InterruptedException { + this.lock(); + } + + + public class LockException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public LockException(String e) { + super(e); + } + + public LockException(Exception e) { + super(e); + } + } + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java new file mode 100644 index 0000000..ebf4368 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java @@ -0,0 +1,139 @@ +package com.zdjizhi.utils.zookeeper; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * @author qidaijie + * @Package cn.ac.iie.utils.zookeeper + * @Description: + * @date 2020/11/1411:28 + */ +public class ZookeeperUtils implements Watcher { + private static final Log logger = LogFactory.get(); + + private ZooKeeper zookeeper; + + private static final int SESSION_TIME_OUT = 20000; + + private CountDownLatch countDownLatch = new CountDownLatch(1); + + @Override + public void process(WatchedEvent event) { + if (event.getState() == Event.KeeperState.SyncConnected) { + countDownLatch.countDown(); + } + } + + + /** + * 修改节点信息 + * + * @param path 节点路径 + */ + public int modifyNode(String path, String zookeeperIp) { + createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp); + int workerId = 0; + try { + connectZookeeper(zookeeperIp); + Stat stat = zookeeper.exists(path, true); + workerId = Integer.parseInt(getNodeDate(path)); + if (workerId > 63) { + workerId = 0; + zookeeper.setData(path, "1".getBytes(), stat.getVersion()); + } else { + String result = String.valueOf(workerId + 1); + if (stat != null) { + zookeeper.setData(path, result.getBytes(), stat.getVersion()); + } else { + logger.error("Node does not exist!,Can't modify"); + } + } + } catch (KeeperException | InterruptedException e) { + logger.error("modify error Can't modify," + e); + } finally { + closeConn(); + } + logger.warn("workerID is:" + workerId); + return workerId; + } + + /** + * 连接zookeeper + * + * @param host 地址 + */ + public void connectZookeeper(String host) { + try { + zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); + countDownLatch.await(); + } catch (IOException | InterruptedException e) { + logger.error("Connection to the Zookeeper Exception! message:" + e); + } + } + + /** + * 关闭连接 + */ + public void closeConn() { + try { + if (zookeeper != null) { + zookeeper.close(); + } + } catch (InterruptedException e) { + logger.error("Close the Zookeeper connection Exception! message:" + e); + } + } + + /** + * 获取节点内容 + * + * @param path 节点路径 + * @return 内容/异常null + */ + public String getNodeDate(String path) { + String result = null; + Stat stat = new Stat(); + try { + byte[] resByte = zookeeper.getData(path, true, stat); + + result = StrUtil.str(resByte, "UTF-8"); + } catch (KeeperException | InterruptedException e) { + logger.error("Get node information exception" + e); + } + return result; + } + + /** + * @param path 节点创建的路径 + * @param date 节点所存储的数据的byte[] + * @param acls 控制权限策略 + */ + public void createNode(String path, byte[] date, List<ACL> acls, String zookeeperIp) { + try { + connectZookeeper(zookeeperIp); + Stat exists = zookeeper.exists(path, true); + if (exists == null) { + Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true); + if (existsSnowflakeld == null) { + zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT); + } + zookeeper.create(path, date, acls, CreateMode.PERSISTENT); + } else { + logger.warn("Node already exists ! Don't need to create"); + } + } catch (KeeperException | InterruptedException e) { + logger.error(e); + } finally { + closeConn(); + } + } +} diff --git a/src/main/log4j.properties b/src/main/log4j.properties new file mode 100644 index 0000000..9d91936 --- /dev/null +++ b/src/main/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=info,console,file +# 控制台日志设置 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 文件日志设置 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#路径请用相对路径,做好相关测试输出到应用目下 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 +log4j.logger.com.nis.web.dao=debug +#bonecp数据源配置 +log4j.category.com.jolbox=debug,console + + diff --git a/src/main/logback.xml b/src/main/logback.xml new file mode 100644 index 0000000..a508b6b --- /dev/null +++ b/src/main/logback.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + + <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符--> + <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /> + <!-- 定义日志存储的路径,不要配置相对路径 --> + <property name="LOG_FILE_PATH" value="E:/logs/demo.%d{yyyy-MM-dd}.%i.log" /> + + <!-- 控制台输出日志 --> + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <!-- 按照上面配置的LOG_PATTERN来打印日志 --> + <pattern>${LOG_PATTERN}</pattern> + </encoder> + </appender> + + <!--每天生成一个日志文件,保存30天的日志文件。rollingFile是用来切分文件的 --> + <appender name="FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE_PATH}</fileNamePattern> + <!-- keep 15 days' worth of history --> + <maxHistory>30</maxHistory> + <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> + <!-- 日志文件的最大大小 --> + <maxFileSize>20MB</maxFileSize> + </timeBasedFileNamingAndTriggeringPolicy> + </rollingPolicy> + + <encoder> + <pattern>${LOG_PATTERN}</pattern> + </encoder> + </appender> + <!-- project default level项目输出的日志级别 --> + <logger name="com.example.demo" level="DEBUG" /> + + <!-- 日志输出级别 常用的日志级别按照从高到低依次为:ERROR、WARN、INFO、DEBUG。 --> + <root level="INFO"> + <appender-ref ref="CONSOLE" /> + <appender-ref ref="FILE" /><!--对应appender name="FILE"。 --> + </root> +</configuration>
\ No newline at end of file diff --git a/src/test/java/com/zdjizhi/KafkaLogSend.java b/src/test/java/com/zdjizhi/KafkaLogSend.java new file mode 100644 index 0000000..5c3feb3 --- /dev/null +++ b/src/test/java/com/zdjizhi/KafkaLogSend.java @@ -0,0 +1,92 @@ +package com.zdjizhi; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.DefaultProConfig; +import org.apache.kafka.clients.producer.*; + +import java.util.Properties; + +/** + * NTC系统配置产生日志写入数据中心类 + * + * @author Administrator + * @create 2018-08-13 15:11 + */ + +public class KafkaLogSend { + private static final Log logger = LogFactory.get(); + + /** + * kafka生产者,用于向kafka中发送消息 + */ + private static org.apache.kafka.clients.producer.Producer<String, String> kafkaProducer; + + /** + * kafka生产者适配器(单例),用来代理kafka生产者发送消息 + */ + private static KafkaLogSend kafkaLogSend; + + private KafkaLogSend() { + initKafkaProducer(); + } + + public static KafkaLogSend getInstance() { + if (kafkaLogSend == null) { + kafkaLogSend = new KafkaLogSend(); + } + return kafkaLogSend; + } + + + public void sendMessage(String message) { +// for (String value : list) { + kafkaProducer.send(new ProducerRecord<>("test", message), new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + logger.error("写入test出现异常", exception); + } + } + }); +// } +// kafkaProducer.flush(); + logger.debug("Log sent to National Center successfully!!!!!"); + } + + /** + * 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次 + */ + private void initKafkaProducer() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093"); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", "1"); + properties.put("retries", DefaultProConfig.RETRIES); + properties.put("linger.ms", DefaultProConfig.LINGER_MS); + properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", DefaultProConfig.BATCH_SIZE); + properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY); + properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE); + + properties.put("security.protocol", "SSL"); + properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks"); + properties.put("ssl.keystore.password", "ceiec2019"); + properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks"); + properties.put("ssl.truststore.password", "ceiec2019"); + properties.put("ssl.key.password", "ceiec2019"); + + + /* + * kafka限流配置-20201117 + */ +// properties.put(ProducerConfig.CLIENT_ID_CONFIG, VoipRelationConfig.PRODUCER_CLIENT_ID); +// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + + + kafkaProducer = new KafkaProducer<>(properties); + } + + +} diff --git a/src/test/java/com/zdjizhi/KafkaTest.java b/src/test/java/com/zdjizhi/KafkaTest.java new file mode 100644 index 0000000..3bb6d1c --- /dev/null +++ b/src/test/java/com/zdjizhi/KafkaTest.java @@ -0,0 +1,53 @@ +package com.zdjizhi; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import org.apache.kafka.clients.producer.*; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/8/217:39 + */ +public class KafkaTest { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093"); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("acks", "1"); +// properties.put("retries", DefaultProConfig.RETRIES); +// properties.put("linger.ms", DefaultProConfig.LINGER_MS); +// properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS); +// properties.put("batch.size", DefaultProConfig.BATCH_SIZE); +// properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY); +// properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE); + + properties.put("security.protocol", "SSL"); +// properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks"); + properties.put("ssl.keystore.location", "/usr/ca/client/client.keystore.jks"); + properties.put("ssl.keystore.password", "ceiec2019"); +// properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks"); + properties.put("ssl.truststore.location", "/usr/ca/trust/client.truststore.jks"); + properties.put("ssl.truststore.password", "ceiec2019"); + properties.put("ssl.key.password", "ceiec2019"); + + Producer<String, String> producer = new KafkaProducer<String, String>(properties); + + producer.send(new ProducerRecord<>("test", "hello!"), new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + logger.error("写入test出现异常", exception); + } + } + }); + + producer.close(); + } +} diff --git a/src/test/java/com/zdjizhi/LocationTest.java b/src/test/java/com/zdjizhi/LocationTest.java new file mode 100644 index 0000000..e7b2d15 --- /dev/null +++ b/src/test/java/com/zdjizhi/LocationTest.java @@ -0,0 +1,28 @@ +package com.zdjizhi; + +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.IpLookup; +import org.junit.Test; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/8/1811:34 + */ +public class LocationTest { + private static IpLookup ipLookup = new IpLookup.Builder(false) + .loadDataFileV4("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_v4.mmdb") + .loadDataFileV6("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_v6.mmdb") + .loadDataFilePrivateV4("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_private_v4.mmdb") + .loadDataFilePrivateV6("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_private_v6.mmdb") + .build(); + + @Test + public void IpLocationTest() { + System.out.println(ipLookup.cityLookupDetail("24.241.112.0")); + System.out.println(ipLookup.cityLookupDetail("1.1.1.1")); + System.out.println(ipLookup.cityLookupDetail("192.168.50.58")); + System.out.println(ipLookup.cityLookupDetail("2600:1700:9010::")); + } +} |
