summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author‘wangchengcheng’ <‘[email protected]’>2021-12-16 10:00:55 +0800
committer‘wangchengcheng’ <‘[email protected]’>2021-12-16 10:00:55 +0800
commit0d777c206dc3fde7ce1615ed45150c61babe0adb (patch)
tree1b2c52d719d7f44bbe4300de14bd6c63bbc4fcea
parent70272eb6ec98ba5b3d081dcba94a4e0b3c33505b (diff)
1.更新为日志补全11版本develop
2.完善文件名后缀种类
-rw-r--r--.idea/workspace.xml177
-rw-r--r--dependency-reduced-pom.xml214
-rw-r--r--src/main/java/com/zdjizhi/bean/FileMeta.java51
-rw-r--r--src/main/java/com/zdjizhi/bean/SourceList.java22
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java82
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java97
-rw-r--r--src/main/java/com/zdjizhi/utils/app/AppUtils.java123
-rw-r--r--src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java18
-rw-r--r--src/main/java/com/zdjizhi/utils/fast/TransFormFast.java153
-rw-r--r--src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java239
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java123
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java17
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java21
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java20
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java23
-rw-r--r--src/main/java/com/zdjizhi/utils/general/FileEdit.java51
-rw-r--r--src/main/java/com/zdjizhi/utils/general/SnowflakeId.java213
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormMap.java145
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormObject.java153
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java151
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFunction.java280
-rw-r--r--src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java210
-rw-r--r--src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java77
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java328
-rw-r--r--src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java187
-rw-r--r--src/main/java/com/zdjizhi/utils/json/TypeUtils.java171
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java36
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/Consumer.java44
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/Producer.java79
-rw-r--r--src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java64
-rw-r--r--src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java81
-rw-r--r--src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java190
-rw-r--r--src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java140
-rw-r--r--src/main/log4j.properties25
-rw-r--r--src/main/logback.xml42
-rw-r--r--src/test/java/com/zdjizhi/FIleTypeTest.java13
-rw-r--r--src/test/java/com/zdjizhi/KafkaTest.java55
-rw-r--r--src/test/java/com/zdjizhi/LocationTest.java28
-rw-r--r--target/classes/com/zdjizhi/bean/FileMeta.classbin0 -> 1561 bytes
-rw-r--r--target/classes/com/zdjizhi/bean/SourceList.classbin0 -> 802 bytes
-rw-r--r--target/classes/com/zdjizhi/common/FlowWriteConfig.classbin0 -> 3980 bytes
-rw-r--r--target/classes/com/zdjizhi/topology/LogFlowWriteTopology.classbin0 -> 5637 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/app/AppUtils$1.classbin0 -> 1371 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/app/AppUtils.classbin0 -> 4560 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/exception/FlowWriteException.classbin0 -> 490 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/fast/TransFormFast.classbin0 -> 3254 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/fast/TransFunctionFast.classbin0 -> 6937 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction$1.classbin0 -> 616 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction.classbin0 -> 6486 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/functions/FilterNullFunction.classbin0 -> 871 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/functions/MapCompletedFunction.classbin0 -> 934 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/functions/ObjectCompletedFunction.classbin0 -> 946 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/functions/TypeMapCompletedFunction.classbin0 -> 950 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/general/FileEdit.classbin0 -> 2443 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/general/SnowflakeId.classbin0 -> 4120 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/general/TransFormMap.classbin0 -> 4757 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/general/TransFormObject.classbin0 -> 4810 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/general/TransFormTypeMap.classbin0 -> 4868 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/general/TransFunction.classbin0 -> 7822 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/hbase/HBaseUtils$1.classbin0 -> 1394 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/hbase/HBaseUtils.classbin0 -> 8581 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/http/HttpClientUtil.classbin0 -> 3800 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/json/JsonParseUtil.classbin0 -> 9348 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/json/JsonTypeUtils.classbin0 -> 4732 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/json/TypeUtils.classbin0 -> 3357 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/kafka/CertUtils.classbin0 -> 1782 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/kafka/Consumer.classbin0 -> 2286 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/kafka/Producer.classbin0 -> 2767 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/ordinary/MD5Utils.classbin0 -> 1948 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/system/FlowWriteConfigurations.classbin0 -> 3175 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/zookeeper/DistributedLock$LockException.classbin0 -> 973 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/zookeeper/DistributedLock.classbin0 -> 6206 bytes
-rw-r--r--target/classes/com/zdjizhi/utils/zookeeper/ZookeeperUtils.classbin0 -> 5517 bytes
-rw-r--r--target/classes/default_config.properties54
-rw-r--r--target/classes/file_type.properties5
-rw-r--r--target/classes/service_flow_config.properties73
-rw-r--r--target/log-completion-doublewrite-1214.jarbin0 -> 78158100 bytes
-rw-r--r--target/maven-archiver/pom.properties5
-rw-r--r--target/original-log-completion-doublewrite-1214.jarbin0 -> 72671 bytes
-rw-r--r--target/surefire-reports/TEST-com.zdjizhi.LocationTest.xml68
-rw-r--r--target/surefire-reports/com.zdjizhi.LocationTest.txt4
-rw-r--r--target/test-classes/com/zdjizhi/FIleTypeTest.classbin0 -> 995 bytes
-rw-r--r--target/test-classes/com/zdjizhi/KafkaTest$1.classbin0 -> 936 bytes
-rw-r--r--target/test-classes/com/zdjizhi/KafkaTest.classbin0 -> 2220 bytes
-rw-r--r--target/test-classes/com/zdjizhi/LocationTest.classbin0 -> 1497 bytes
85 files changed, 4271 insertions, 81 deletions
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 0dc48fe..7c76dd7 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -2,83 +2,90 @@
<project version="4">
<component name="ChangeListManager">
<list default="true" id="dfd1cd53-a804-4106-9206-5126890781e9" name="Default Changelist" comment="">
- <change afterPath="$PROJECT_DIR$/log-completion-doublewrite.iml" afterDir="false" />
- <change afterPath="$PROJECT_DIR$/properties/file_type.properties" afterDir="false" />
- <change beforePath="$PROJECT_DIR$/.idea/compiler.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/compiler.xml" afterDir="false" />
- <change beforePath="$PROJECT_DIR$/.idea/modules.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/modules.xml" afterDir="false" />
- <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
- <change beforePath="$PROJECT_DIR$/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
- <change beforePath="$PROJECT_DIR$/properties/default_config.properties" beforeDir="false" afterPath="$PROJECT_DIR$/properties/default_config.properties" afterDir="false" />
- <change beforePath="$PROJECT_DIR$/properties/service_flow_config.properties" beforeDir="false" afterPath="$PROJECT_DIR$/properties/service_flow_config.properties" afterDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/bean/FileMeta.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/bean/SourceList.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/common/FlowWriteConfig.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/app/AppUtils.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/FileEdit.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/TransFormMap.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/TransFormObject.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/TransFunction.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/json/TypeUtils.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/kafka/Consumer.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/kafka/Producer.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/log4j.properties" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/main/logback.xml" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/test/java/com/zdjizhi/KafkaTest.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/test/java/com/zdjizhi/LocationTest.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/src/test/java/com/zdjizhi/TestTime.java" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/bean/FileMeta.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/bean/SourceList.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/common/FlowWriteConfig.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/topology/LogFlowWriteTopology.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/app/AppUtils$1.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/app/AppUtils.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/exception/FlowWriteException.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction$1.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/functions/FilterNullFunction.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/functions/MapCompletedFunction.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/FileEdit.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/SnowflakeId.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/TransFormMap.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/TransFormObject.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/TransFormTypeMap.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/TransFunction.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/hbase/HBaseUtils$1.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/hbase/HBaseUtils.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/http/HttpClientUtil.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/json/JsonParseUtil.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/json/JsonTypeUtils.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/json/TypeUtils.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/kafka/CertUtils.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/kafka/Consumer.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/kafka/Producer.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/ordinary/MD5Utils.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/system/FlowWriteConfigurations.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/zookeeper/DistributedLock$LockException.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/zookeeper/DistributedLock.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/zookeeper/ZookeeperUtils.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/default_config.properties" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/classes/service_flow_config.properties" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/test-classes/com/zdjizhi/KafkaTest$1.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/test-classes/com/zdjizhi/KafkaTest.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/test-classes/com/zdjizhi/LocationTest.class" beforeDir="false" />
- <change beforePath="$PROJECT_DIR$/target/test-classes/com/zdjizhi/TestTime.class" beforeDir="false" />
+ <change afterPath="$PROJECT_DIR$/dependency-reduced-pom.xml" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/bean/FileMeta.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/bean/SourceList.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/common/FlowWriteConfig.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/app/AppUtils.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/fast/TransFormFast.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/FileEdit.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/TransFormMap.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/TransFormObject.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/general/TransFunction.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/json/TypeUtils.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/kafka/Consumer.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/kafka/Producer.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/log4j.properties" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/main/logback.xml" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/test/java/com/zdjizhi/FIleTypeTest.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/test/java/com/zdjizhi/KafkaTest.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/src/test/java/com/zdjizhi/LocationTest.java" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/bean/FileMeta.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/bean/SourceList.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/common/FlowWriteConfig.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/topology/LogFlowWriteTopology.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/app/AppUtils$1.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/app/AppUtils.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/exception/FlowWriteException.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/fast/TransFormFast.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/fast/TransFunctionFast.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction$1.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/functions/FilterNullFunction.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/functions/MapCompletedFunction.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/functions/ObjectCompletedFunction.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/functions/TypeMapCompletedFunction.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/FileEdit.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/SnowflakeId.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/TransFormMap.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/TransFormObject.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/TransFormTypeMap.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/general/TransFunction.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/hbase/HBaseUtils$1.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/hbase/HBaseUtils.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/http/HttpClientUtil.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/json/JsonParseUtil.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/json/JsonTypeUtils.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/json/TypeUtils.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/kafka/CertUtils.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/kafka/Consumer.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/kafka/Producer.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/ordinary/MD5Utils.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/system/FlowWriteConfigurations.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/zookeeper/DistributedLock$LockException.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/zookeeper/DistributedLock.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/com/zdjizhi/utils/zookeeper/ZookeeperUtils.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/default_config.properties" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/file_type.properties" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/classes/service_flow_config.properties" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/log-completion-doublewrite-1214.jar" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/maven-archiver/pom.properties" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/original-log-completion-doublewrite-1214.jar" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/surefire-reports/TEST-com.zdjizhi.LocationTest.xml" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/surefire-reports/com.zdjizhi.LocationTest.txt" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/test-classes/com/zdjizhi/FIleTypeTest.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/test-classes/com/zdjizhi/KafkaTest$1.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/test-classes/com/zdjizhi/KafkaTest.class" afterDir="false" />
+ <change afterPath="$PROJECT_DIR$/target/test-classes/com/zdjizhi/LocationTest.class" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
@@ -196,13 +203,21 @@
<option name="project" value="LOCAL" />
<updated>1634902912230</updated>
</task>
- <option name="localTasksCounter" value="2" />
+ <task id="LOCAL-00002" summary="1.更新为日志补全11版本&#10;2.完善文件名后缀种类">
+ <created>1639619962300</created>
+ <option name="number" value="00002" />
+ <option name="presentableId" value="LOCAL-00002" />
+ <option name="project" value="LOCAL" />
+ <updated>1639619962300</updated>
+ </task>
+ <option name="localTasksCounter" value="3" />
<servers />
</component>
<component name="VcsManagerConfiguration">
<MESSAGE value="针对p19项目研发的双写程序" />
<MESSAGE value="为完善文件名后缀的版本" />
- <option name="LAST_COMMIT_MESSAGE" value="为完善文件名后缀的版本" />
+ <MESSAGE value="1.更新为日志补全11版本&#10;2.完善文件名后缀种类" />
+ <option name="LAST_COMMIT_MESSAGE" value="1.更新为日志补全11版本&#10;2.完善文件名后缀种类" />
</component>
<component name="WindowStateProjectService">
<state x="467" y="69" key="#LogFlowWriteTopology" timestamp="1639551343993">
@@ -213,10 +228,10 @@
<screen x="0" y="0" width="1536" height="824" />
</state>
<state x="223" y="67" key="#com.intellij.execution.impl.EditConfigurationsDialog/[email protected]" timestamp="1639555238664" />
- <state x="322" y="6" width="891" height="814" key="CommitChangelistDialog2" timestamp="1638771902930">
+ <state x="322" y="6" width="891" height="814" key="CommitChangelistDialog2" timestamp="1639619960582">
<screen x="0" y="0" width="1536" height="824" />
</state>
- <state x="322" y="6" width="891" height="814" key="CommitChangelistDialog2/[email protected]" timestamp="1638771902930" />
+ <state x="322" y="6" width="891" height="814" key="CommitChangelistDialog2/[email protected]" timestamp="1639619960582" />
<state x="93" y="93" width="1350" height="638" key="DiffContextDialog" timestamp="1638771862046">
<screen x="0" y="0" width="1536" height="824" />
</state>
diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml
new file mode 100644
index 0000000..300cf24
--- /dev/null
+++ b/dependency-reduced-pom.xml
@@ -0,0 +1,214 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>log-completion-doublewrite</artifactId>
+ <name>log-completion-doublewrite</name>
+ <version>1214</version>
+ <url>http://www.example.com</url>
+ <build>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ <include>**/*.xml</include>
+ </includes>
+ </resource>
+ <resource>
+ <directory>src\main\java</directory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer>
+ <mainClass>com.zdjizhi.topology.LogFlowWriteTopology</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>io.github.zlika</groupId>
+ <artifactId>reproducible-build-maven-plugin</artifactId>
+ <version>0.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>strip-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public</url>
+ </repository>
+ <repository>
+ <releases />
+ <snapshots>
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ <id>maven-ali</id>
+ <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
+ </repository>
+ </repositories>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>1.13.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>flink-annotations</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>flink-metrics-core</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>flink-shaded-asm-7</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>kryo</artifactId>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>flink-shaded-guava</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>force-shading</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.12</artifactId>
+ <version>1.13.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>flink-file-sink-common</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>flink-runtime_2.12</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>flink-shaded-guava</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>force-shading</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.12</artifactId>
+ <version>1.13.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>flink-optimizer_2.12</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>flink-runtime_2.12</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>force-shading</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_2.12</artifactId>
+ <version>1.13.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>kafka-clients</artifactId>
+ <groupId>org.apache.kafka</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>flink-connector-base</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>force-shading</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>1.13.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>force-shading</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>hamcrest-core</artifactId>
+ <groupId>org.hamcrest</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <properties>
+ <hbase.version>2.2.3</hbase.version>
+ <hadoop.version>2.7.1</hadoop.version>
+ <flink.version>1.13.1</flink.version>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <scope.type>provided</scope.type>
+ <kafka.version>1.0.0</kafka.version>
+ </properties>
+</project>
+
diff --git a/src/main/java/com/zdjizhi/bean/FileMeta.java b/src/main/java/com/zdjizhi/bean/FileMeta.java
new file mode 100644
index 0000000..e24e0b4
--- /dev/null
+++ b/src/main/java/com/zdjizhi/bean/FileMeta.java
@@ -0,0 +1,51 @@
+package com.zdjizhi.bean;
+
+import com.alibaba.fastjson.JSONArray;
+
+public class FileMeta {
+ private long common_log_id;
+ protected int common_recv_time;
+ private String common_schema_type;
+ private JSONArray sourceList;
+ private int processing_time;
+
+ public long getCommon_log_id() {
+ return common_log_id;
+ }
+
+ public void setCommon_log_id(long common_log_id) {
+ this.common_log_id = common_log_id;
+ }
+
+ public int getCommon_recv_time() {
+ return common_recv_time;
+ }
+
+ public void setCommon_recv_time(int common_recv_time) {
+ this.common_recv_time = common_recv_time;
+ }
+
+ public String getCommon_schema_type() {
+ return common_schema_type;
+ }
+
+ public void setCommon_schema_type(String common_schema_type) {
+ this.common_schema_type = common_schema_type;
+ }
+
+ public JSONArray getSourceList() {
+ return sourceList;
+ }
+
+ public void setSourceList(JSONArray sourceList) {
+ this.sourceList = sourceList;
+ }
+
+ public int getProcessing_time() {
+ return processing_time;
+ }
+
+ public void setProcessing_time(int processing_time) {
+ this.processing_time = processing_time;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/bean/SourceList.java b/src/main/java/com/zdjizhi/bean/SourceList.java
new file mode 100644
index 0000000..8fba85d
--- /dev/null
+++ b/src/main/java/com/zdjizhi/bean/SourceList.java
@@ -0,0 +1,22 @@
+package com.zdjizhi.bean;
+
+public class SourceList {
+ private String destination_oss_path;
+ private String source_oss_path;
+
+ public String getDestination_oss_path() {
+ return destination_oss_path;
+ }
+
+ public void setDestination_oss_path(String destination_oss_path) {
+ this.destination_oss_path = destination_oss_path;
+ }
+
+ public String getSource_oss_path() {
+ return source_oss_path;
+ }
+
+ public void setSource_oss_path(String source_oss_path) {
+ this.source_oss_path = source_oss_path;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
new file mode 100644
index 0000000..c6f4ebb
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -0,0 +1,82 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.utils.system.FlowWriteConfigurations;
+
+/**
+ * @author Administrator
+ */
+public class FlowWriteConfig {
+
+ public static final int IF_PARAM_LENGTH = 3;
+ public static final String VISIBILITY = "disabled";
+ public static final String FORMAT_SPLITTER = ",";
+ public static final String IS_JSON_KEY_TAG = "$.";
+ public static final String IF_CONDITION_SPLITTER = "=";
+ public static final String MODEL = "remote";
+ public static final String PROTOCOL_SPLITTER = "\\.";
+
+ /**
+ * System config
+ */
+
+ public static final String INPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "input.kafka.servers");
+ public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism");
+ public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism");
+ public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
+ public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
+ public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
+ public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
+ public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
+ public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
+ public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
+ public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
+ public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
+
+ /**
+ * kafka source config
+ */
+ public static final String INPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "input.kafka.topic");
+ public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
+
+
+ /**
+ * kafka sink config
+ */
+ public static final String OUTPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "output.kafka.servers");
+ public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
+ public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+ public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
+ public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic");
+ public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack");
+ public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+ public static final String KAFKA_SOURCE_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.source.protocol");
+ public static final String KAFKA_SINK_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.sink.protocol");
+ public static final String KAFKA_USER = FlowWriteConfigurations.getStringProperty(1, "kafka.user");
+ public static final String KAFKA_PIN = FlowWriteConfigurations.getStringProperty(1, "kafka.pin");
+ public static final String OUTPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "output.kafka.topic");
+
+
+ public static final String PERCENT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "percent.kafka.topic");
+
+ /**
+ * connection kafka
+ */
+ public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
+
+ /**
+ * http
+ */
+ public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http");
+ public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(0, "app.id.http");
+
+ public static final String OOS_SERVERS = FlowWriteConfigurations.getStringProperty(0, "oos.servers");
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
new file mode 100644
index 0000000..1f5d81a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -0,0 +1,97 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.functions.*;
+import com.zdjizhi.utils.kafka.Consumer;
+import com.zdjizhi.utils.kafka.Producer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2021/5/2016:42
+ */
+public class LogFlowWriteTopology {
+ private static final Log logger = LogFactory.get();
+
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ //开启Checkpoint,interval用于指定checkpoint的触发间隔(单位milliseconds)
+// environment.enableCheckpointing(5000);
+
+ //两个输出之间的最大时间 (单位milliseconds)
+ environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
+
+ DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
+ .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM);
+
+ if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
+ DataStream<String> cleaningLog;
+ switch (FlowWriteConfig.LOG_TRANSFORM_TYPE) {
+ case 0:
+ //对原始日志进行处理补全转换等,不对日志字段类型做校验。
+ cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+
+ break;
+ case 1:
+ //对原始日志进行处理补全转换等,强制要求日志字段类型与schema一致。
+ cleaningLog = streamSource.map(new ObjectCompletedFunction()).name("ObjectCompletedFunction")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+
+ break;
+ case 2:
+ //对原始日志进行处理补全转换等,对日志字段类型做若校验,可根据schema进行强转。
+ cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+
+ break;
+ default:
+ //对原始日志进行处理补全转换等,不对日志字段类型做校验。
+ cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+
+ }
+
+// //过滤空数据不发送到Kafka内
+ DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+
+ //处理带有非结构化日志的数据
+ SingleOutputStreamOperator<String> process = result.process(new DealFileProcessFunction());
+
+ //文件元数据发送至TRAFFIC-FILE-METADATA
+ process.getSideOutput(DealFileProcessFunction.metaToKafa).addSink(Producer.getKafkaProducer()).name("toTrafficFileMeta")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+
+//// //补全后的数据发送给百分点的kafka
+ process.addSink(Producer.getPercentKafkaProducer()).name("toPercentKafka")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+ process.getSideOutput(DealFileProcessFunction.metaToKafa).print();
+ } else {
+ //过滤空数据不发送到Kafka内
+ DataStream<String> result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+ //发送数据到Kafka
+ result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
+ .setParallelism(FlowWriteConfig.SINK_PARALLELISM);
+ }
+
+ try {
+ environment.execute(args[0]);
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is :" + e);
+ e.printStackTrace();
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
new file mode 100644
index 0000000..0caeb25
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
@@ -0,0 +1,123 @@
+package com.zdjizhi.utils.app;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.http.HttpClientUtil;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * AppId 工具类
+ *
+ * @author qidaijie
+ */
+
+public class AppUtils {
+ private static final Log logger = LogFactory.get();
+ private static Map<Integer, String> appIdMap = new ConcurrentHashMap<>(128);
+ private static AppUtils appUtils;
+
+ private static void getAppInstance() {
+ appUtils = new AppUtils();
+ }
+
+
+ /**
+ * 构造函数-新
+ */
+ private AppUtils() {
+ //定时更新
+ updateAppIdCache();
+ }
+
+ /**
+ * 更新变量
+ */
+ private static void change() {
+ if (appUtils == null) {
+ getAppInstance();
+ }
+ timestampsFilter();
+ }
+
+
+ /**
+ * 获取变更内容
+ */
+ private static void timestampsFilter() {
+ try {
+ Long begin = System.currentTimeMillis();
+ String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP);
+ if (StringUtil.isNotBlank(schema)) {
+ String data = JSONObject.parseObject(schema).getString("data");
+ JSONArray objects = JSONArray.parseArray(data);
+ for (Object object : objects) {
+ JSONArray jsonArray = JSONArray.parseArray(object.toString());
+ int key = jsonArray.getInteger(0);
+ String value = jsonArray.getString(1);
+ if (appIdMap.containsKey(key)) {
+ if (!value.equals(appIdMap.get(key))) {
+ appIdMap.put(key, value);
+ }
+ } else {
+ appIdMap.put(key, value);
+ }
+ }
+ logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis()));
+ logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
+ }
+ } catch (RuntimeException e) {
+ logger.error("Update cache app-id failed, exception:" + e);
+ }
+ }
+
+
+ /**
+ * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
+ */
+ private void updateAppIdCache() {
+ ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
+ executorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) {
+ change();
+ }
+ } catch (RuntimeException e) {
+ logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
+ }
+ }
+ }, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
+ }
+
+
+ /**
+ * 获取 appName
+ *
+ * @param appId app_id
+ * @return account
+ */
+ public static String getAppName(int appId) {
+
+ if (appUtils == null) {
+ getAppInstance();
+ }
+
+ if (appIdMap.containsKey(appId)) {
+ return appIdMap.get(appId);
+ } else {
+ logger.warn("AppMap get appName is null, ID is :" + appId);
+ return "";
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java
new file mode 100644
index 0000000..67c88f0
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java
@@ -0,0 +1,18 @@
+package com.zdjizhi.utils.exception;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.storm.utils.execption
+ * @Description:
+ * @date 2021/3/259:42
+ */
+public class FlowWriteException extends RuntimeException {
+
+ public FlowWriteException() {
+ }
+
+ public FlowWriteException(String message) {
+ super(message);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/fast/TransFormFast.java b/src/main/java/com/zdjizhi/utils/fast/TransFormFast.java
new file mode 100644
index 0000000..96f1219
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/fast/TransFormFast.java
@@ -0,0 +1,153 @@
+package com.zdjizhi.utils.fast;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.general.SnowflakeId;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.json.JsonTypeUtils;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+
+/**
+ * 描述:转换或补全工具类
+ *
+ * @author qidaijie
+ */
+public class TransFormFast {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 获取任务列表
+ * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
+ * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
+ */
+ private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+// /**
+// * 解析日志,并补全
+// *
+// * @param message kafka Topic原始日志
+// * @return 补全后的日志
+// */
+ @SuppressWarnings("unchecked")
+// public static String dealCommonMessage(String message) {
+// try {
+// if (StringUtil.isNotBlank(message)) {
+// JSONObject jsonMap = JSONObject.parseObject(message);
+// JsonParseUtil.dropJsonField(jsonMap);
+// for (String[] strings : jobList) {
+// //用到的参数的值
+// Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
+// //需要补全的字段的key
+// String appendToKeyName = strings[1];
+// //需要补全的字段的值
+// Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+// //匹配操作函数的字段
+// String function = strings[2];
+// //额外的参数的值
+// String param = strings[3];
+// functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
+// }
+//
+// return FastJsonTypeUtils.typeTransform(jsonMap);
+//
+// } else {
+// return "";
+// }
+// } catch (RuntimeException e) {
+// logger.error("Logs TransForm Exception! Error message is:" + e);
+// e.printStackTrace();
+// return "";
+// }
+// }
+
+
+ /**
+ * 根据schema描述对应字段进行操作的 函数集合
+ *
+ * @param function 匹配操作函数的字段
+ * @param jsonMap 原始日志解析map
+ * @param appendToKeyName 需要补全的字段的key
+ * @param appendToKeyValue 需要补全的字段的值
+ * @param logValue 用到的参数的值
+ * @param param 额外的参数的值
+ */
+ private static void functionSet(String function, JSONObject jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) {
+ switch (function) {
+ case "current_timestamp":
+ if (!(appendToKeyValue instanceof Long)) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getCurrentTime());
+ }
+ break;
+ case "snowflake_id":
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
+ break;
+ case "geo_ip_detail":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoIpDetail(logValue.toString()));
+ }
+ break;
+ case "geo_asn":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoAsn(logValue.toString()));
+ }
+ break;
+ case "geo_ip_country":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getGeoIpCountry(logValue.toString()));
+ }
+ break;
+ case "set_value":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.setValue(param));
+ }
+ break;
+ case "get_value":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
+ }
+ break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.condition(jsonMap, param));
+ }
+ break;
+ case "sub_domain":
+ if (appendToKeyValue == null && logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.getTopDomain(logValue.toString()));
+ }
+ break;
+ case "radius_match":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.radiusMatch(logValue.toString()));
+ }
+ break;
+ case "app_match":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.appMatch(logValue.toString()));
+ }
+ break;
+ case "decode_of_base64":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.decodeBase64(logValue.toString(), TransFunctionFast.isJsonValue(jsonMap, param)));
+ }
+ break;
+ case "flattenSpec":
+ if (logValue != null && param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunctionFast.flattenSpec(logValue.toString(), param));
+ }
+ break;
+ default:
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java b/src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java
new file mode 100644
index 0000000..eeb2aaa
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/fast/TransFunctionFast.java
@@ -0,0 +1,239 @@
+package com.zdjizhi.utils.fast;
+
+import cn.hutool.core.codec.Base64;
+import cn.hutool.core.text.StrSpliter;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONObject;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.FormatUtils;
+import com.zdjizhi.utils.IpLookup;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.app.AppUtils;
+import com.zdjizhi.utils.hbase.HBaseUtils;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.json.TypeUtils;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @author qidaijie
+ */
+class TransFunctionFast {
+
+ private static final Log logger = LogFactory.get();
+
+ private static final Pattern PATTERN = Pattern.compile("[0-9]*");
+
+ /**
+ * IP定位库工具类
+ */
+ private static IpLookup ipLookup = new IpLookup.Builder(false)
+ .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb")
+ .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6.mmdb")
+ .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v4.mmdb")
+ .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v6.mmdb")
+ .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
+ .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
+ .build();
+
+ /**
+ * 生成当前时间戳的操作
+ */
+ static long getCurrentTime() {
+
+ return System.currentTimeMillis() / 1000;
+ }
+
+ /**
+ * 根据clientIp获取location信息
+ *
+ * @param ip client IP
+ * @return ip地址详细信息
+ */
+ static String getGeoIpDetail(String ip) {
+
+ return ipLookup.cityLookupDetail(ip);
+
+ }
+
+ /**
+ * 根据ip获取asn信息
+ *
+ * @param ip client/server IP
+ * @return ASN
+ */
+ static String getGeoAsn(String ip) {
+
+ return ipLookup.asnLookup(ip);
+ }
+
+ /**
+ * 根据ip获取country信息
+ *
+ * @param ip server IP
+ * @return 国家
+ */
+ static String getGeoIpCountry(String ip) {
+
+ return ipLookup.countryLookup(ip);
+ }
+
+
+ /**
+ * radius借助HBase补齐
+ *
+ * @param ip client IP
+ * @return account
+ */
+ static String radiusMatch(String ip) {
+ return HBaseUtils.getAccount(ip.trim());
+ }
+
+ /**
+ * appId与缓存中对应关系补全appName
+ *
+ * @param appIds app id 列表
+ * @return appName
+ */
+ static String appMatch(String appIds) {
+ try {
+ String appId = StrSpliter.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0);
+ return AppUtils.getAppName(Integer.parseInt(appId));
+ } catch (NumberFormatException | ClassCastException exception) {
+ logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds);
+ return "";
+ }
+ }
+
+ /**
+ * 解析顶级域名
+ *
+ * @param domain 初始域名
+ * @return 顶级域名
+ */
+ static String getTopDomain(String domain) {
+ try {
+ return FormatUtils.getTopPrivateDomain(domain);
+ } catch (StringIndexOutOfBoundsException outException) {
+ logger.error("解析顶级域名异常,异常域名:" + domain);
+ return "";
+ }
+ }
+
+ /**
+ * 根据编码解码base64
+ *
+ * @param message base64
+ * @param charset 编码
+ * @return 解码字符串
+ */
+ static String decodeBase64(String message, Object charset) {
+ String result = "";
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ if (charset == null) {
+ result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
+ } else {
+ result = Base64.decodeStr(message, charset.toString());
+ }
+ }
+ } catch (RuntimeException rune) {
+ logger.error("解析 Base64 异常,异常信息:" + rune);
+ }
+ return result;
+ }
+
+ /**
+ * 根据表达式解析json
+ *
+ * @param message json
+ * @param expr 解析表达式
+ * @return 解析结果
+ */
+ static String flattenSpec(String message, String expr) {
+ String flattenResult = "";
+ try {
+ if (StringUtil.isNotBlank(expr)) {
+ ArrayList<String> read = JsonPath.parse(message).read(expr);
+ if (read.size() >= 1) {
+ flattenResult = read.get(0);
+ }
+ }
+ } catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) {
+ logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e);
+ }
+ return flattenResult;
+ }
+
+
+ /**
+ * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
+ *
+ * @param jsonMap 内存实体类
+ * @param param 字段名/普通字符串
+ * @return JSON.Value or String
+ */
+ static Object isJsonValue(JSONObject jsonMap, String param) {
+ if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
+ return JsonParseUtil.getValue(jsonMap, param.substring(2));
+ } else {
+ return param;
+ }
+ }
+
+
+ /**
+ * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
+ *
+ * @param jsonMap 内存实体类
+ * @param ifParam 字段名/普通字符串
+ * @return resultA or resultB or null
+ */
+ static Object condition(JSONObject jsonMap, String ifParam) {
+ Object result = null;
+ try {
+ String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
+ if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
+ String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
+ Object direction = isJsonValue(jsonMap, norms[0]);
+ Object resultA = isJsonValue(jsonMap, split[1]);
+ Object resultB = isJsonValue(jsonMap, split[2]);
+ if (direction instanceof Number) {
+ result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
+ } else if (direction instanceof String) {
+ result = direction.equals(norms[1]) ? resultA : resultB;
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("IF 函数执行异常,异常信息:" + e);
+ }
+ return result;
+ }
+
+
+ /**
+ * 设置固定值函数 若为数字则转为long返回
+ *
+ * @param param 默认值
+ * @return 返回数字或字符串
+ */
+ static Object setValue(String param) {
+ try {
+ Matcher isNum = PATTERN.matcher(param);
+ if (isNum.matches()) {
+ return Long.parseLong(param);
+ } else {
+ return param;
+ }
+ } catch (RuntimeException e) {
+ logger.error("SetValue 函数异常,异常信息:" + e);
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java b/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java
new file mode 100644
index 0000000..782750a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java
@@ -0,0 +1,123 @@
+package com.zdjizhi.utils.functions;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.bean.FileMeta;
+import com.zdjizhi.bean.SourceList;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.general.FileEdit;
+import com.zdjizhi.utils.json.JsonTypeUtils;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.Map;
+
+
+/**
+ * @author wangchengcheng
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/10/14
+ */
+public class DealFileProcessFunction extends ProcessFunction<String,String> {
+ private static final Log logger = LogFactory.get();
+
+ private Map<String, Object> jsonMap = null;
+ private String rpUrlValue;
+ private String rqUrlValue;
+ private String emailUrlValue;
+
+ private long cfgId = 0; //= common_policy_id;
+
+ private String sIp = null; // = common_client_ip;
+ private int sPort = 0;// = common_client_port;
+ private String dIp = null;//= common_server_ip;
+ private int dPort = 0;// = common_server_port;
+ private long foundTime = 0;// = common_recv_time;
+ private String account = null;
+ private String domain = null;
+ private String schemaType = null;
+
+
+ //初始化侧输流的标记
+ public static OutputTag<String> metaToKafa = new OutputTag<String>("metaToKafka") {};
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void processElement(String message, Context context, Collector<String> collector) throws Exception {
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ Map<String, Object> map = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
+ jsonMap = JsonTypeUtils.typeTransform(map);
+ rpUrlValue = (String) jsonMap.get("http_response_body");
+ rqUrlValue = (String) jsonMap.get("http_request_body");
+ emailUrlValue = (String) jsonMap.get("mail_eml_file");
+
+
+
+ if (StringUtil.isNotBlank(rpUrlValue) || StringUtil.isNotBlank(rqUrlValue) || StringUtil.isNotBlank(emailUrlValue)) {
+ cfgId = (long) jsonMap.get("common_policy_id");
+ sIp = (String) jsonMap.get("common_client_ip");
+ sPort = (int) jsonMap.get("common_client_port");
+ dIp = (String) jsonMap.get("common_server_ip");
+ dPort = (int) jsonMap.get("common_server_port");
+ foundTime = (long) jsonMap.get("common_recv_time");
+ schemaType = (String) jsonMap.get("common_schema_type");
+
+ if (StringUtil.isNotBlank((String) jsonMap.get("http_domain"))) {
+ domain = jsonMap.get("http_domain").toString();
+ } else {
+ domain = "NA";
+ }
+ if (StringUtil.isNotBlank((String) jsonMap.get("common_subscribe_id"))) {
+ account = jsonMap.get("common_subscribe_id").toString();
+ } else {
+ account = "NA";
+ }
+
+ FileMeta fileMeta = new FileMeta();
+ JSONArray jsonarray = new JSONArray();
+ if (StringUtil.isNotBlank(rqUrlValue)) {
+ jsonMap.put("http_request_body", FileEdit.fileDownloadUrl(rqUrlValue, "_1"));
+ SourceList request = new SourceList();
+ request.setSource_oss_path(rqUrlValue);
+ request.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, "_1"));
+ jsonarray.add(request);
+ }
+ if (StringUtil.isNotBlank(rpUrlValue)) {
+ jsonMap.put("http_response_body", FileEdit.fileDownloadUrl(rpUrlValue, "_2"));
+ SourceList response = new SourceList();
+ response.setSource_oss_path(rpUrlValue);
+ response.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, "_2"));
+ jsonarray.add(response);
+ }
+ if (StringUtil.isNotBlank(emailUrlValue)) {
+ jsonMap.put("mail_eml_file", FileEdit.fileDownloadUrl(emailUrlValue, "_9"));
+ SourceList emailFile = new SourceList();
+ emailFile.setSource_oss_path(emailUrlValue);
+ emailFile.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, "_9"));
+ jsonarray.add(emailFile);
+ }
+ fileMeta.setSourceList(jsonarray);
+ fileMeta.setCommon_log_id((long) jsonMap.get("common_log_id"));
+ fileMeta.setCommon_recv_time(Integer.parseInt(jsonMap.get("common_recv_time").toString()));
+ fileMeta.setCommon_schema_type((String) jsonMap.get("common_schema_type"));
+ fileMeta.setProcessing_time((int) (System.currentTimeMillis() / 1000));
+
+ context.output(metaToKafa, JSONObject.toJSONString(fileMeta));
+ }
+ collector.collect(JsonMapper.toJsonString(jsonMap));
+ } else {
+ collector.collect(message);
+ }
+ }catch (RuntimeException e) {
+ logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message);
+
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
new file mode 100644
index 0000000..de507ad
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
@@ -0,0 +1,17 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class FilterNullFunction implements FilterFunction<String> {
+ @Override
+ public boolean filter(String message) {
+ return StringUtil.isNotBlank(message);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
new file mode 100644
index 0000000..5e5d0b7
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
@@ -0,0 +1,21 @@
+package com.zdjizhi.utils.functions;
+
+
+import com.zdjizhi.utils.general.TransFormMap;
+import org.apache.flink.api.common.functions.MapFunction;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class MapCompletedFunction implements MapFunction<String, String> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public String map(String logs) {
+ return TransFormMap.dealCommonMessage(logs);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java
new file mode 100644
index 0000000..131d2f6
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/ObjectCompletedFunction.java
@@ -0,0 +1,20 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.general.TransFormObject;
+import org.apache.flink.api.common.functions.MapFunction;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class ObjectCompletedFunction implements MapFunction<String, String> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public String map(String logs) {
+ return TransFormObject.dealCommonMessage(logs);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
new file mode 100644
index 0000000..46c8709
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/functions/TypeMapCompletedFunction.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.utils.functions;
+
+import com.zdjizhi.utils.fast.TransFormFast;
+import com.zdjizhi.utils.general.TransFormTypeMap;
+import org.apache.flink.api.common.functions.MapFunction;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class TypeMapCompletedFunction implements MapFunction<String, String> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public String map(String logs) {
+
+ return TransFormTypeMap.dealCommonMessage(logs);
+// return TransFormFast.dealCommonMessage(logs);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/general/FileEdit.java b/src/main/java/com/zdjizhi/utils/general/FileEdit.java
new file mode 100644
index 0000000..cefb351
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/FileEdit.java
@@ -0,0 +1,51 @@
+package com.zdjizhi.utils.general;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.ordinary.MD5Utils;
+
+import static com.zdjizhi.utils.system.FlowWriteConfigurations.judgeFileType;
+
+
+/**
+ * 文件字段操作工具
+ */
+public class FileEdit {
+
+
+ public static String fileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileSuffix) throws Exception {
+ String fileType = null;
+ if (judgeFileType(getFileType(urlValue))){
+ fileType = getFileType(urlValue);
+ }else {
+ if (schemaType.equals("HTTP")){
+ fileType = "html";
+ }
+ if (schemaType.equals("MAIL")){
+ fileType = "eml";
+ }
+ }
+
+
+
+
+ return "http://"+ FlowWriteConfig.OOS_SERVERS+"/upload_v2"+"/"+cfgId+"/"+fileType+"/"+sIp+"/"+sPort+"/"+dIp+"/"+dPort+"/"+foundTime+"/"+account+"/"+domain+"/"+getFileName(urlValue,fileSuffix);
+ }
+
+ public static String fileDownloadUrl( String urlValue,String fileSuffix) throws Exception {
+ return "http://"+ FlowWriteConfig.OOS_SERVERS+"/download_v2"+"/"+getFileName(urlValue,fileSuffix);
+ }
+
+
+ public static String getFileType(String url){
+ String[] split = url.split("\\.");
+ return split[split.length-1];
+ }
+
+ public static String getFileName(String url,String fileSuffix) throws Exception {
+ String[] arr = url.split("/");
+ String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_"));
+ String prefix = MD5Utils.md5Encode(filename);
+// String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf("."));
+ return prefix+fileSuffix;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
new file mode 100644
index 0000000..168fec2
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
@@ -0,0 +1,213 @@
+package com.zdjizhi.utils.general;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.zookeeper.DistributedLock;
+import com.zdjizhi.utils.zookeeper.ZookeeperUtils;
+
+/**
+ * 雪花算法
+ *
+ * @author qidaijie
+ */
+public class SnowflakeId {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 共64位 第一位为符号位 默认0
+ * 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :6位(0-63),
+ * workerId(关联进程):7(0-127) ,序列号:11位(2047/ms)
+ *
+ * 序列号 /ms = (-1L ^ (-1L << 11))
+ * 最大使用年 = (1L << 39) / (1000L * 60 * 60 * 24 * 365)
+ */
+ /**
+ * 开始时间截 (2020-11-14 00:00:00) max 17years
+ */
+ private final long twepoch = 1605283200000L;
+
+ /**
+ * 机器id所占的位数
+ */
+ private final long workerIdBits = 8L;
+
+ /**
+ * 数据标识id所占的位数
+ */
+ private final long dataCenterIdBits = 5L;
+
+ /**
+ * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
+ * M << n = M * 2^n
+ */
+ private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
+
+ /**
+ * 支持的最大数据标识id,结果是127
+ */
+ private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
+
+ /**
+ * 序列在id中占的位数
+ */
+ private final long sequenceBits = 11L;
+
+ /**
+ * 机器ID向左移12位
+ */
+ private final long workerIdShift = sequenceBits;
+
+ /**
+ * 数据标识id向左移17位(14+6)
+ */
+ private final long dataCenterIdShift = sequenceBits + workerIdBits;
+
+ /**
+ * 时间截向左移22位(4+6+14)
+ */
+ private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
+
+ /**
+ * 生成序列的掩码,这里为2047
+ */
+ private final long sequenceMask = -1L ^ (-1L << sequenceBits);
+
+ /**
+ * 工作机器ID(0~255)
+ */
+ private long workerId;
+
+ /**
+ * 数据中心ID(0~31)
+ */
+ private long dataCenterId;
+
+ /**
+ * 毫秒内序列(0~2047)
+ */
+ private long sequence = 0L;
+
+ /**
+ * 上次生成ID的时间截
+ */
+ private long lastTimestamp = -1L;
+
+
+ /**
+ * 设置允许时间回拨的最大限制10s
+ */
+ private static final long rollBackTime = 10000L;
+
+
+ private static SnowflakeId idWorker;
+
+ private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils();
+
+ static {
+ idWorker = new SnowflakeId(FlowWriteConfig.ZOOKEEPER_SERVERS, FlowWriteConfig.DATA_CENTER_ID_NUM);
+ }
+
+ //==============================Constructors=====================================
+
+ /**
+ * 构造函数
+ */
+ private SnowflakeId(String zookeeperIp, long dataCenterIdNum) {
+ DistributedLock lock = new DistributedLock(FlowWriteConfig.ZOOKEEPER_SERVERS, "disLocks1");
+ try {
+ lock.lock();
+ int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp);
+ if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) {
+ throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
+ }
+ if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) {
+ throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId));
+ }
+ this.workerId = tmpWorkerId;
+ this.dataCenterId = dataCenterIdNum;
+ } catch (RuntimeException e) {
+ logger.error("This is not usual error!!!===>>>" + e + "<<<===");
+ }finally {
+ lock.unlock();
+ }
+ }
+
+ // ==============================Methods==========================================
+
+ /**
+ * 获得下一个ID (该方法是线程安全的)
+ *
+ * @return SnowflakeId
+ */
+ private synchronized long nextId() {
+ long timestamp = timeGen();
+ //设置一个允许回拨限制时间,系统时间回拨范围在rollBackTime内可以等待校准
+ if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) {
+ timestamp = tilNextMillis(lastTimestamp);
+ }
+ //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
+ if (timestamp < lastTimestamp) {
+ throw new RuntimeException(
+ String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
+ }
+
+ //如果是同一时间生成的,则进行毫秒内序列
+ if (lastTimestamp == timestamp) {
+ sequence = (sequence + 1) & sequenceMask;
+ //毫秒内序列溢出
+ if (sequence == 0) {
+ //阻塞到下一个毫秒,获得新的时间戳
+ timestamp = tilNextMillis(lastTimestamp);
+ }
+ }
+ //时间戳改变,毫秒内序列重置
+ else {
+ sequence = 0L;
+ }
+
+ //上次生成ID的时间截
+ lastTimestamp = timestamp;
+
+ //移位并通过或运算拼到一起组成64位的ID
+ return ((timestamp - twepoch) << timestampLeftShift)
+ | (dataCenterId << dataCenterIdShift)
+ | (workerId << workerIdShift)
+ | sequence;
+ }
+
+ /**
+ * 阻塞到下一个毫秒,直到获得新的时间戳
+ *
+ * @param lastTimestamp 上次生成ID的时间截
+ * @return 当前时间戳
+ */
+ protected long tilNextMillis(long lastTimestamp) {
+ long timestamp = timeGen();
+ while (timestamp <= lastTimestamp) {
+ timestamp = timeGen();
+ }
+ return timestamp;
+ }
+
+ /**
+ * 返回以毫秒为单位的当前时间
+ *
+ * @return 当前时间(毫秒)
+ */
+ protected long timeGen() {
+ return System.currentTimeMillis();
+ }
+
+
+ /**
+ * 静态工具类
+ *
+ * @return
+ */
+ public static Long generateId() {
+ return idWorker.nextId();
+ }
+
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
new file mode 100644
index 0000000..5ae9859
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
@@ -0,0 +1,145 @@
+package com.zdjizhi.utils.general;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+
+/**
+ * 描述:转换或补全工具类
+ *
+ * @author qidaijie
+ */
+public class TransFormMap {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 获取任务列表
+ * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
+ * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
+ */
+ private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+ /**
+ * 解析日志,并补全
+ *
+ * @param message kafka Topic原始日志
+ * @return 补全后的日志
+ */
+ @SuppressWarnings("unchecked")
+ public static String dealCommonMessage(String message) {
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ Map<String, Object> jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
+ JsonParseUtil.dropJsonField(jsonMap);
+ for (String[] strings : jobList) {
+ //用到的参数的值
+ Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
+ //需要补全的字段的key
+ String appendToKeyName = strings[1];
+ //需要补全的字段的值
+ Object appendTo = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+ //匹配操作函数的字段
+ String function = strings[2];
+ //额外的参数的值
+ String param = strings[3];
+ functionSet(function, jsonMap, appendToKeyName, appendTo, logValue, param);
+ }
+ return JsonMapper.toJsonString(jsonMap);
+ } else {
+ return null;
+ }
+ } catch (RuntimeException e) {
+ logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
+ return null;
+ }
+ }
+
+
+ /**
+ * 根据schema描述对应字段进行操作的 函数集合
+ *
+ * @param function 匹配操作函数的字段
+ * @param jsonMap 原始日志解析map
+ * @param appendToKeyName 需要补全的字段的key
+ * @param appendTo 需要补全的字段的值
+ * @param logValue 用到的参数的值
+ * @param param 额外的参数的值
+ */
+ private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendTo, Object logValue, String param) {
+ switch (function) {
+ case "current_timestamp":
+ if (!(appendTo instanceof Long)) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
+ }
+ break;
+ case "snowflake_id":
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
+ break;
+ case "geo_ip_detail":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
+ }
+ break;
+ case "geo_asn":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
+ }
+ break;
+ case "geo_ip_country":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
+ }
+ break;
+ case "set_value":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
+ }
+ break;
+ case "get_value":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
+ }
+ break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
+ }
+ break;
+ case "sub_domain":
+ if (appendTo == null && logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
+ }
+ break;
+ case "radius_match":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString()));
+ }
+ break;
+ case "decode_of_base64":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
+ }
+ break;
+ case "flattenSpec":
+ if (logValue != null && param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
+ }
+ break;
+ case "app_match":
+ if (logValue != null && appendTo == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
+ }
+ break;
+ default:
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormObject.java b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java
new file mode 100644
index 0000000..54629db
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormObject.java
@@ -0,0 +1,153 @@
+package com.zdjizhi.utils.general;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+
+/**
+ * 描述:转换或补全工具类
+ *
+ * @author qidaijie
+ */
+public class TransFormObject {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 在内存中加载反射类用的map
+ */
+ private static HashMap<String, Class> map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+ /**
+ * 反射成一个类
+ */
+ private static Object mapObject = JsonParseUtil.generateObject(map);
+
+ /**
+ * 获取任务列表
+ * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
+ * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
+ */
+ private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+ /**
+ * 解析日志,并补全
+ *
+ * @param message kafka Topic原始日志
+ * @return 补全后的日志
+ */
+ public static String dealCommonMessage(String message) {
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ Object object = JsonMapper.fromJsonString(message, mapObject.getClass());
+ for (String[] strings : jobList) {
+ //用到的参数的值
+ Object name = JsonParseUtil.getValue(object, strings[0]);
+ //需要补全的字段的key
+ String appendToKeyName = strings[1];
+ //需要补全的字段的值
+ Object appendTo = JsonParseUtil.getValue(object, appendToKeyName);
+ //匹配操作函数的字段
+ String function = strings[2];
+ //额外的参数的值
+ String param = strings[3];
+ functionSet(function, object, appendToKeyName, appendTo, name, param);
+ }
+ return JsonMapper.toJsonString(object);
+ } else {
+ return null;
+ }
+ } catch (RuntimeException e) {
+ logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
+ return null;
+ }
+ }
+
+
+ /**
+ * 根据schema描述对应字段进行操作的 函数集合
+ *
+ * @param function 匹配操作函数的字段
+ * @param object 动态POJO Object
+ * @param appendToKeyName 需要补全的字段的key
+ * @param appendTo 需要补全的字段的值
+ * @param name 用到的参数的值
+ * @param param 额外的参数的值
+ */
+ private static void functionSet(String function, Object object, String appendToKeyName, Object appendTo, Object name, String param) {
+ switch (function) {
+ case "current_timestamp":
+ if (!(appendTo instanceof Long)) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getCurrentTime());
+ }
+ break;
+ case "snowflake_id":
+ JsonParseUtil.setValue(object, appendToKeyName, SnowflakeId.generateId());
+ break;
+ case "geo_ip_detail":
+ if (name != null && appendTo == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpDetail(name.toString()));
+ }
+ break;
+ case "geo_asn":
+ if (name != null && appendTo == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoAsn(name.toString()));
+ }
+ break;
+ case "geo_ip_country":
+ if (name != null && appendTo == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getGeoIpCountry(name.toString()));
+ }
+ break;
+ case "set_value":
+ if (name != null && param != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.setValue(param));
+ }
+ break;
+ case "get_value":
+ if (name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, name);
+ }
+ break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.condition(object, param));
+ }
+ break;
+ case "sub_domain":
+ if (appendTo == null && name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.getTopDomain(name.toString()));
+ }
+ break;
+ case "radius_match":
+ if (name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.radiusMatch(name.toString()));
+ }
+ break;
+ case "decode_of_base64":
+ if (name != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.decodeBase64(name.toString(), TransFunction.isJsonValue(object, param)));
+ }
+ break;
+ case "flattenSpec":
+ if (name != null && param != null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.flattenSpec(name.toString(), param));
+ }
+ break;
+ case "app_match":
+ if (name != null && appendTo == null) {
+ JsonParseUtil.setValue(object, appendToKeyName, TransFunction.appMatch(name.toString()));
+ }
+ break;
+ default:
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
new file mode 100644
index 0000000..765e23e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
@@ -0,0 +1,151 @@
+package com.zdjizhi.utils.general;
+
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.json.JsonTypeUtils;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import static com.alibaba.fastjson.serializer.SerializerFeature.WriteMapNullValue;
+
+
+/**
+ * 描述:转换或补全工具类
+ *
+ * @author qidaijie
+ */
+public class TransFormTypeMap {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 获取任务列表
+ * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
+ * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
+ */
+ private static ArrayList<String[]> jobList = JsonParseUtil.getJobListFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+ /**
+ * 解析日志,并补全
+ *
+ * @param message kafka Topic原始日志
+ * @return 补全后的日志
+ */
+ @SuppressWarnings("unchecked")
+ public static String dealCommonMessage(String message) {
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ Map<String, Object> map = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
+ Map<String, Object> jsonMap = JsonTypeUtils.typeTransform(map);
+ for (String[] strings : jobList) {
+ //用到的参数的值
+ Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
+ //需要补全的字段的key
+ String appendToKeyName = strings[1];
+ //需要补全的字段的值
+ Object appendToKeyValue = JsonParseUtil.getValue(jsonMap, appendToKeyName);
+ //匹配操作函数的字段
+ String function = strings[2];
+ //额外的参数的值
+ String param = strings[3];
+ functionSet(function, jsonMap, appendToKeyName, appendToKeyValue, logValue, param);
+ }
+ return JsonMapper.toJsonString(jsonMap);
+
+ } else {
+ return null;
+ }
+ } catch (RuntimeException e) {
+ logger.error("解析补全日志信息过程异常,异常信息:" + e + "\n" + message);
+ return null;
+ }
+ }
+
+
+ /**
+ * 根据schema描述对应字段进行操作的 函数集合
+ *
+ * @param function 匹配操作函数的字段
+ * @param jsonMap 原始日志解析map
+ * @param appendToKeyName 需要补全的字段的key
+ * @param appendToKeyValue 需要补全的字段的值
+ * @param logValue 用到的参数的值
+ * @param param 额外的参数的值
+ */
+ private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKeyName, Object appendToKeyValue, Object logValue, String param) {
+ switch (function) {
+ case "current_timestamp":
+ if (!(appendToKeyValue instanceof Long)) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getCurrentTime());
+ }
+ break;
+ case "snowflake_id":
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, SnowflakeId.generateId());
+ break;
+ case "geo_ip_detail":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpDetail(logValue.toString()));
+ }
+ break;
+ case "geo_asn":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoAsn(logValue.toString()));
+ }
+ break;
+ case "geo_ip_country":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getGeoIpCountry(logValue.toString()));
+ }
+ break;
+ case "set_value":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, param);
+ }
+ break;
+ case "get_value":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, logValue);
+ }
+ break;
+ case "if":
+ if (param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.condition(jsonMap, param));
+ }
+ break;
+ case "sub_domain":
+ if (appendToKeyValue == null && logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.getTopDomain(logValue.toString()));
+ }
+ break;
+ case "radius_match":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.radiusMatch(logValue.toString()));
+ }
+ break;
+ case "app_match":
+ if (logValue != null && appendToKeyValue == null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
+ }
+ break;
+ case "decode_of_base64":
+ if (logValue != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
+ }
+ break;
+ case "flattenSpec":
+ if (logValue != null && param != null) {
+ JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
+ }
+ break;
+ default:
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
new file mode 100644
index 0000000..549f3cc
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -0,0 +1,280 @@
+package com.zdjizhi.utils.general;
+
+import cn.hutool.core.codec.Base64;
+import cn.hutool.core.text.StrSpliter;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.FormatUtils;
+import com.zdjizhi.utils.IpLookup;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.app.AppUtils;
+import com.zdjizhi.utils.hbase.HBaseUtils;
+import com.zdjizhi.utils.json.JsonParseUtil;
+import com.zdjizhi.utils.json.TypeUtils;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @author qidaijie
+ */
+class TransFunction {
+
+ private static final Log logger = LogFactory.get();
+
+ private static final Pattern PATTERN = Pattern.compile("[0-9]*");
+
+ /**
+ * IP定位库工具类
+ */
+ private static IpLookup ipLookup = new IpLookup.Builder(false)
+ .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb")
+ .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6.mmdb")
+ .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v4.mmdb")
+ .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v6.mmdb")
+ .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
+ .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
+ .build();
+
+ /**
+ * 生成当前时间戳的操作
+ */
+ static long getCurrentTime() {
+
+ return System.currentTimeMillis() / 1000;
+ }
+
+ /**
+ * 根据clientIp获取location信息
+ *
+ * @param ip client IP
+ * @return ip地址详细信息
+ */
+ static String getGeoIpDetail(String ip) {
+
+ return ipLookup.cityLookupDetail(ip);
+
+ }
+
+ /**
+ * 根据ip获取asn信息
+ *
+ * @param ip client/server IP
+ * @return ASN
+ */
+ static String getGeoAsn(String ip) {
+
+ return ipLookup.asnLookup(ip);
+ }
+
+ /**
+ * 根据ip获取country信息
+ *
+ * @param ip server IP
+ * @return 国家
+ */
+ static String getGeoIpCountry(String ip) {
+
+ return ipLookup.countryLookup(ip);
+ }
+
+
+ /**
+ * radius借助HBase补齐
+ *
+ * @param ip client IP
+ * @return account
+ */
+ static String radiusMatch(String ip) {
+ return HBaseUtils.getAccount(ip.trim());
+ }
+
+ /**
+ * appId与缓存中对应关系补全appName
+ *
+ * @param appIds app id 列表
+ * @return appName
+ */
+ static String appMatch(String appIds) {
+ try {
+ String appId = StrSpliter.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0);
+ return AppUtils.getAppName(Integer.parseInt(appId));
+ } catch (NumberFormatException | ClassCastException exception) {
+ logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds);
+ return "";
+ }
+ }
+
+ /**
+ * 解析顶级域名
+ *
+ * @param domain 初始域名
+ * @return 顶级域名
+ */
+ static String getTopDomain(String domain) {
+ try {
+ return FormatUtils.getTopPrivateDomain(domain);
+ } catch (StringIndexOutOfBoundsException outException) {
+ logger.error("解析顶级域名异常,异常域名:" + domain);
+ return "";
+ }
+ }
+
+ /**
+ * 根据编码解码base64
+ *
+ * @param message base64
+ * @param charset 编码
+ * @return 解码字符串
+ */
+ static String decodeBase64(String message, Object charset) {
+ String result = "";
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ if (charset == null) {
+ result = Base64.decodeStr(message, FlowWriteConfig.MAIL_DEFAULT_CHARSET);
+ } else {
+ result = Base64.decodeStr(message, charset.toString());
+ }
+ }
+ } catch (RuntimeException rune) {
+ logger.error("解析 Base64 异常,异常信息:" + rune);
+ }
+ return result;
+ }
+
+ /**
+ * 根据表达式解析json
+ *
+ * @param message json
+ * @param expr 解析表达式
+ * @return 解析结果
+ */
+ static String flattenSpec(String message, String expr) {
+ String flattenResult = "";
+ try {
+ if (StringUtil.isNotBlank(expr)) {
+ ArrayList<String> read = JsonPath.parse(message).read(expr);
+ if (read.size() >= 1) {
+ flattenResult = read.get(0);
+ }
+ }
+ } catch (ClassCastException | InvalidPathException | ArrayIndexOutOfBoundsException e) {
+ logger.error("设备标签解析异常,[ " + expr + " ]解析表达式错误" + e);
+ }
+ return flattenResult;
+ }
+
+
+ /**
+ * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
+ *
+ * @param object 内存实体类
+ * @param param 字段名/普通字符串
+ * @return JSON.Value or String
+ */
+ static Object isJsonValue(Object object, String param) {
+ if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
+ return JsonParseUtil.getValue(object, param.substring(2));
+ } else {
+ return param;
+ }
+ }
+
+ /**
+ * 判断是否为日志字段,是则返回对应value,否则返回原始字符串
+ *
+ * @param jsonMap 内存实体类
+ * @param param 字段名/普通字符串
+ * @return JSON.Value or String
+ */
+ static Object isJsonValue(Map<String, Object> jsonMap, String param) {
+ if (param.contains(FlowWriteConfig.IS_JSON_KEY_TAG)) {
+ return JsonParseUtil.getValue(jsonMap, param.substring(2));
+ } else {
+ return param;
+ }
+ }
+
+ /**
+ * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
+ *
+ * @param object 内存实体类
+ * @param ifParam 字段名/普通字符串
+ * @return resultA or resultB or null
+ */
+ static Object condition(Object object, String ifParam) {
+ Object result = null;
+ try {
+ String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
+ if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
+ String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
+ Object direction = isJsonValue(object, norms[0]);
+ Object resultA = isJsonValue(object, split[1]);
+ Object resultB = isJsonValue(object, split[2]);
+ if (direction instanceof Number) {
+ result = TypeUtils.castToIfFunction((Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB);
+ } else if (direction instanceof String) {
+ result = TypeUtils.castToIfFunction(direction.equals(norms[1]) ? resultA : resultB);
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("IF 函数执行异常,异常信息:" + e);
+ }
+ return result;
+ }
+
+ /**
+ * IF函数实现,解析日志构建三目运算;包含判断是否为数字若为数字则转换为long类型返回结果。
+ *
+ * @param jsonMap 内存实体类
+ * @param ifParam 字段名/普通字符串
+ * @return resultA or resultB or null
+ */
+ static Object condition(Map<String, Object> jsonMap, String ifParam) {
+ Object result = null;
+ try {
+ String[] split = ifParam.split(FlowWriteConfig.FORMAT_SPLITTER);
+ if (split.length == FlowWriteConfig.IF_PARAM_LENGTH) {
+ String[] norms = split[0].split(FlowWriteConfig.IF_CONDITION_SPLITTER);
+ Object direction = isJsonValue(jsonMap, norms[0]);
+ Object resultA = isJsonValue(jsonMap, split[1]);
+ Object resultB = isJsonValue(jsonMap, split[2]);
+ if (direction instanceof Number) {
+ result = (Integer.parseInt(direction.toString()) == Integer.parseInt(norms[1])) ? resultA : resultB;
+ } else if (direction instanceof String) {
+ result = direction.equals(norms[1]) ? resultA : resultB;
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("IF 函数执行异常,异常信息:" + e);
+ }
+ return result;
+ }
+
+
+ /**
+ * 设置固定值函数 若为数字则转为long返回
+ *
+ * @param param 默认值
+ * @return 返回数字或字符串
+ */
+ static Object setValue(String param) {
+ try {
+ Matcher isNum = PATTERN.matcher(param);
+ if (isNum.matches()) {
+ return Long.parseLong(param);
+ } else {
+ return param;
+ }
+ } catch (RuntimeException e) {
+ logger.error("SetValue 函数异常,异常信息:" + e);
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
new file mode 100644
index 0000000..de5e149
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
@@ -0,0 +1,210 @@
+package com.zdjizhi.utils.hbase;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HBase 工具类
+ *
+ * @author qidaijie
+ */
+
+public class HBaseUtils {
+ private static final Log logger = LogFactory.get();
+ private static Map<String, String> subIdMap = new ConcurrentHashMap<>(16);
+ private static Connection connection;
+ private static Long time;
+
+ private static HBaseUtils hBaseUtils;
+
+ private static void getInstance() {
+ hBaseUtils = new HBaseUtils();
+ }
+
+
+ /**
+ * 构造函数-新
+ */
+ private HBaseUtils() {
+ //获取连接
+ getConnection();
+ //拉取所有
+ getAll();
+ //定时更新
+ updateCache();
+ }
+
+ private static void getConnection() {
+ try {
+ // 管理Hbase的配置信息
+ Configuration configuration = HBaseConfiguration.create();
+ // 设置zookeeper节点
+ configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
+ configuration.set("hbase.client.retries.number", "3");
+ configuration.set("hbase.bulkload.retries.number", "3");
+ configuration.set("zookeeper.recovery.retry", "3");
+ connection = ConnectionFactory.createConnection(configuration);
+ time = System.currentTimeMillis();
+ logger.warn("HBaseUtils get HBase connection,now to getAll().");
+ } catch (IOException ioe) {
+ logger.error("HBaseUtils getHbaseConn() IOException===>{" + ioe + "}<===");
+ } catch (RuntimeException e) {
+ logger.error("HBaseUtils getHbaseConn() Exception===>{" + e + "}<===");
+ }
+ }
+
+ /**
+ * 更新变量
+ */
+ private static void change() {
+ if (hBaseUtils == null) {
+ getInstance();
+ }
+ long nowTime = System.currentTimeMillis();
+ timestampsFilter(time - 1000, nowTime + 500);
+ }
+
+
+ /**
+ * 获取变更内容
+ *
+ * @param startTime 开始时间
+ * @param endTime 结束时间
+ */
+ private static void timestampsFilter(Long startTime, Long endTime) {
+ Long begin = System.currentTimeMillis();
+ Table table = null;
+ ResultScanner scanner = null;
+ Scan scan2 = new Scan();
+ try {
+ table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME));
+ scan2.setTimeRange(startTime, endTime);
+ scanner = table.getScanner(scan2);
+ for (Result result : scanner) {
+ int acctStatusType = getAcctStatusType(result);
+ String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim();
+ String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim();
+ if (acctStatusType == 1) {
+ if (subIdMap.containsKey(framedIp)) {
+ boolean same = account.equals(subIdMap.get(framedIp));
+ if (!same) {
+ subIdMap.put(framedIp, account);
+ }
+ } else {
+ subIdMap.put(framedIp, account);
+ }
+ } else if (acctStatusType == 2) {
+ subIdMap.remove(framedIp);
+ }
+ }
+ Long end = System.currentTimeMillis();
+ logger.warn("HBaseUtils Now subIdMap.keySet().size() is: " + subIdMap.keySet().size());
+ logger.warn("HBaseUtils Update cache timeConsuming is: " + (end - begin) + ",BeginTime: " + startTime + ",EndTime: " + endTime);
+ time = endTime;
+ } catch (IOException ioe) {
+ logger.error("HBaseUtils timestampsFilter is IOException===>{" + ioe + "}<===");
+ } catch (RuntimeException e) {
+ logger.error("HBaseUtils timestampsFilter is Exception===>{" + e + "}<===");
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException e) {
+ logger.error("HBase Table Close ERROR! Exception message is:" + e);
+ }
+ }
+ }
+ }
+
+ /**
+ * 获取所有的 key value
+ */
+ private static void getAll() {
+ long begin = System.currentTimeMillis();
+ try {
+ Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME));
+ Scan scan2 = new Scan();
+ ResultScanner scanner = table.getScanner(scan2);
+ for (Result result : scanner) {
+ int acctStatusType = getAcctStatusType(result);
+ String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
+ String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
+ if (acctStatusType == 1) {
+ subIdMap.put(framedIp, account);
+ }
+ }
+ logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size());
+ logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size() timeConsuming is: " + (System.currentTimeMillis() - begin));
+ scanner.close();
+ } catch (IOException ioe) {
+ logger.error("HBaseUtils getAll() is IOException===>{" + ioe + "}<===");
+ } catch (RuntimeException e) {
+ logger.error("HBaseUtils getAll() is Exception===>{" + e + "}<===");
+ }
+ }
+
+ /**
+ * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
+ */
+ private void updateCache() {
+// ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
+// new BasicThreadFactory.Builder().namingPattern("hbase-change-pool-%d").daemon(true).build());
+ ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
+ executorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS != 0) {
+ change();
+ }
+ } catch (RuntimeException e) {
+ logger.error("HBaseUtils update hbaseCache is error===>{" + e + "}<===");
+ }
+ }
+ }, 1, FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
+ }
+
+
+ /**
+ * 获取 account
+ *
+ * @param clientIp client_ip
+ * @return account
+ */
+ public static String getAccount(String clientIp) {
+
+ if (hBaseUtils == null) {
+ getInstance();
+ }
+ return subIdMap.get(clientIp);
+
+ }
+
+ private static int getAcctStatusType(Result result) {
+ boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
+ if (hasType) {
+ return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
+ } else {
+ return 1;
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
new file mode 100644
index 0000000..1adb1d1
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/http/HttpClientUtil.java
@@ -0,0 +1,77 @@
+package com.zdjizhi.utils.http;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * 获取网关schema的工具类
+ *
+ * @author qidaijie
+ */
+public class HttpClientUtil {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 请求网关获取schema
+ *
+ * @param http 网关url
+ * @return schema
+ */
+ public static String requestByGetMethod(String http) {
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+ StringBuilder entityStringBuilder;
+
+ HttpGet get = new HttpGet(http);
+ BufferedReader bufferedReader = null;
+ CloseableHttpResponse httpResponse = null;
+ try {
+ httpResponse = httpClient.execute(get);
+ HttpEntity entity = httpResponse.getEntity();
+ entityStringBuilder = new StringBuilder();
+ if (null != entity) {
+ bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
+ int intC;
+ while ((intC = bufferedReader.read()) != -1) {
+ char c = (char) intC;
+ if (c == '\n') {
+ break;
+ }
+ entityStringBuilder.append(c);
+ }
+
+ return entityStringBuilder.toString();
+ }
+ } catch (IOException e) {
+ logger.error("Get Schema from Query engine ERROR! Exception message is:" + e);
+ } finally {
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ logger.error("Close HTTP Client ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (httpResponse != null) {
+ try {
+ httpResponse.close();
+ } catch (IOException e) {
+ logger.error("Close httpResponse ERROR! Exception messgae is:" + e);
+ }
+ }
+ if (bufferedReader != null) {
+ IOUtils.closeQuietly(bufferedReader);
+ }
+ }
+ return "";
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
new file mode 100644
index 0000000..afa1bf3
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java
@@ -0,0 +1,328 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.http.HttpClientUtil;
+import net.sf.cglib.beans.BeanGenerator;
+import net.sf.cglib.beans.BeanMap;
+
+import java.util.*;
+
+/**
+ * 使用FastJson解析json的工具类
+ *
+ * @author qidaijie
+ */
+public class JsonParseUtil {
+
+ private static final Log logger = LogFactory.get();
+
+ private static ArrayList<String> dropList = new ArrayList<>();
+
+ /**
+ * 模式匹配,给定一个类型字符串返回一个类类型
+ *
+ * @param type 类型
+ * @return 类类型
+ */
+
+ private static Class getClassName(String type) {
+ Class clazz;
+
+ switch (type) {
+ case "int":
+ clazz = Integer.class;
+ break;
+ case "string":
+ clazz = String.class;
+ break;
+ case "long":
+ clazz = long.class;
+ break;
+ case "array":
+ clazz = List.class;
+ break;
+ case "double":
+ clazz = double.class;
+ break;
+ case "float":
+ clazz = float.class;
+ break;
+ case "char":
+ clazz = char.class;
+ break;
+ case "byte":
+ clazz = byte.class;
+ break;
+ case "boolean":
+ clazz = boolean.class;
+ break;
+ case "short":
+ clazz = short.class;
+ break;
+ default:
+ clazz = String.class;
+ }
+ return clazz;
+ }
+
+ /**
+ * 获取属性值的方法
+ *
+ * @param obj 对象
+ * @param property key
+ * @return 属性的值
+ */
+ public static Object getValue(Object obj, String property) {
+ try {
+ BeanMap beanMap = BeanMap.create(obj);
+ return beanMap.get(property);
+ } catch (RuntimeException e) {
+ logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e);
+ return null;
+ }
+ }
+
+ /**
+ * 获取属性值的方法
+ *
+ * @param jsonMap 原始日志
+ * @param property key
+ * @return 属性的值
+ */
+ public static Object getValue(Map<String, Object> jsonMap, String property) {
+ try {
+ return jsonMap.getOrDefault(property, null);
+ } catch (RuntimeException e) {
+ logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e);
+ return null;
+ }
+ }
+
+ /**
+ * 获取属性值的方法
+ *
+ * @param jsonMap 原始日志
+ * @param property key
+ * @return 属性的值
+ */
+ public static Object getValue(JSONObject jsonMap, String property) {
+ try {
+ return jsonMap.getOrDefault(property, null);
+ } catch (RuntimeException e) {
+ logger.error("获取json-value异常,异常key:" + property + "异常信息为:" + e);
+ return null;
+ }
+ }
+
+ /**
+ * 更新属性值的方法
+ *
+ * @param jsonMap 原始日志json map
+ * @param property 更新的key
+ * @param value 更新的值
+ */
+ public static void setValue(Map<String, Object> jsonMap, String property, Object value) {
+ try {
+ jsonMap.put(property, value);
+ } catch (RuntimeException e) {
+ logger.error("赋予实体类错误类型数据", e);
+ }
+ }
+
+ /**
+ * 更新属性值的方法
+ *
+ * @param obj 对象
+ * @param property 更新的key
+ * @param value 更新的值
+ */
+ public static void setValue(Object obj, String property, Object value) {
+ try {
+ BeanMap beanMap = BeanMap.create(obj);
+ beanMap.put(property, value);
+ } catch (ClassCastException e) {
+ logger.error("赋予实体类错误类型数据", e);
+ }
+ }
+
+ /**
+ * 更新属性值的方法
+ *
+ * @param jsonMap 原始日志json map
+ * @param property 更新的key
+ * @param value 更新的值
+ */
+ public static void setValue(JSONObject jsonMap, String property, Object value) {
+ try {
+ jsonMap.put(property, value);
+ } catch (RuntimeException e) {
+ logger.error("赋予实体类错误类型数据", e);
+ }
+ }
+
+ /**
+ * 根据反射生成对象的方法
+ *
+ * @param properties 反射类用的map
+ * @return 生成的Object类型的对象
+ */
+ public static Object generateObject(Map properties) {
+ BeanGenerator generator = new BeanGenerator();
+ Set keySet = properties.keySet();
+ for (Object aKeySet : keySet) {
+ String key = (String) aKeySet;
+ generator.addProperty(key, (Class) properties.get(key));
+ }
+ return generator.create();
+ }
+
+ /**
+ * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
+ *
+ * @param http 网关schema地址
+ * @return 用于反射生成schema类型的对象的一个map集合
+ */
+ public static HashMap<String, Class> getMapFromHttp(String http) {
+ HashMap<String, Class> map = new HashMap<>(16);
+
+ String schema = HttpClientUtil.requestByGetMethod(http);
+ Object data = JSON.parseObject(schema).get("data");
+
+ //获取fields,并转化为数组,数组的每个元素都是一个name doc type
+ JSONObject schemaJson = JSON.parseObject(data.toString());
+ JSONArray fields = (JSONArray) schemaJson.get("fields");
+
+ for (Object field : fields) {
+ String filedStr = field.toString();
+ if (checkKeepField(filedStr)) {
+ String name = JsonPath.read(filedStr, "$.name").toString();
+ String type = JsonPath.read(filedStr, "$.type").toString();
+ if (type.contains("{")) {
+ type = JsonPath.read(filedStr, "$.type.type").toString();
+ }
+ //组合用来生成实体类的map
+ map.put(name, getClassName(type));
+ } else {
+ dropList.add(filedStr);
+ }
+ }
+ return map;
+ }
+
+ /**
+ * 判断字段是否需要保留
+ *
+ * @param message 单个field-json
+ * @return true or false
+ */
+ private static boolean checkKeepField(String message) {
+ boolean isKeepField = true;
+ boolean isHiveDoc = JSON.parseObject(message).containsKey("doc");
+ if (isHiveDoc) {
+ boolean isHiveVi = JsonPath.read(message, "$.doc").toString().contains("visibility");
+ if (isHiveVi) {
+ String visibility = JsonPath.read(message, "$.doc.visibility").toString();
+ if (FlowWriteConfig.VISIBILITY.equals(visibility)) {
+ isKeepField = false;
+ }
+ }
+ }
+ return isKeepField;
+ }
+
+ /**
+ * 删除schema内指定的无效字段(jackson)
+ * @param jsonMap
+ */
+ public static void dropJsonField(Map<String, Object> jsonMap) {
+ for (String field : dropList) {
+ jsonMap.remove(field);
+ }
+ }
+
+ /**
+ * 删除schema内指定的无效字段(fastjson)
+ * @param jsonMap
+ */
+ public static void dropJsonField(JSONObject jsonMap) {
+ for (String field : dropList) {
+ jsonMap.remove(field);
+ }
+ }
+
+ /**
+ * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
+ *
+ * @param http 网关url
+ * @return 任务列表
+ */
+ public static ArrayList<String[]> getJobListFromHttp(String http) {
+ ArrayList<String[]> list = new ArrayList<>();
+
+ String schema = HttpClientUtil.requestByGetMethod(http);
+ //解析data
+ Object data = JSON.parseObject(schema).get("data");
+
+ //获取fields,并转化为数组,数组的每个元素都是一个name doc type
+ JSONObject schemaJson = JSON.parseObject(data.toString());
+ JSONArray fields = (JSONArray) schemaJson.get("fields");
+
+ for (Object field : fields) {
+
+ if (JSON.parseObject(field.toString()).containsKey("doc")) {
+ Object doc = JSON.parseObject(field.toString()).get("doc");
+
+ if (JSON.parseObject(doc.toString()).containsKey("format")) {
+ String name = JSON.parseObject(field.toString()).get("name").toString();
+ Object format = JSON.parseObject(doc.toString()).get("format");
+ JSONObject formatObject = JSON.parseObject(format.toString());
+
+ String functions = formatObject.get("functions").toString();
+ String appendTo = null;
+ String params = null;
+
+ if (formatObject.containsKey("appendTo")) {
+ appendTo = formatObject.get("appendTo").toString();
+ }
+
+ if (formatObject.containsKey("param")) {
+ params = formatObject.get("param").toString();
+ }
+
+
+ if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) {
+ String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
+
+ for (int i = 0; i < functionArray.length; i++) {
+ list.add(new String[]{name, appendToArray[i], functionArray[i], null});
+ }
+
+ } else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) {
+ String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
+ String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER);
+
+ for (int i = 0; i < functionArray.length; i++) {
+ list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]});
+
+ }
+ } else {
+ list.add(new String[]{name, name, functions, params});
+ }
+
+ }
+ }
+
+ }
+ return list;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java
new file mode 100644
index 0000000..0b6bc1e
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/JsonTypeUtils.java
@@ -0,0 +1,187 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.JsonMapper;
+import com.zdjizhi.utils.exception.FlowWriteException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author qidaijie
+ * @Package PACKAGE_NAME
+ * @Description:
+ * @date 2021/7/1217:34
+ */
+public class JsonTypeUtils {
+ private static final Log logger = LogFactory.get();
+ /**
+ * 在内存中加载反射类用的map
+ */
+ private static HashMap<String, Class> map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
+
+ /**
+ * 类型转换
+ *
+ * @param jsonMap 原始日志map
+ */
+ public static Map<String, Object> typeTransform(Map<String, Object> jsonMap) throws RuntimeException {
+ JsonParseUtil.dropJsonField(jsonMap);
+ HashMap<String, Object> tmpMap = new HashMap<>(192);
+ for (String key : jsonMap.keySet()) {
+ if (map.containsKey(key)) {
+ String simpleName = map.get(key).getSimpleName();
+ switch (simpleName) {
+ case "String":
+ tmpMap.put(key, checkString(jsonMap.get(key)));
+ break;
+ case "Integer":
+ tmpMap.put(key, getIntValue(jsonMap.get(key)));
+ break;
+ case "long":
+ tmpMap.put(key, checkLongValue(jsonMap.get(key)));
+ break;
+ case "List":
+ tmpMap.put(key, checkArray(jsonMap.get(key)));
+ break;
+ case "Map":
+ tmpMap.put(key, checkObject(jsonMap.get(key)));
+ break;
+ case "double":
+ tmpMap.put(key, checkDouble(jsonMap.get(key)));
+ break;
+ default:
+ tmpMap.put(key, checkString(jsonMap.get(key)));
+ }
+ }
+ }
+ return tmpMap;
+ }
+
+ /**
+ * String 类型检验转换方法
+ *
+ * @param value json value
+ * @return String value
+ */
+ private static String checkString(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map){
+ return JsonMapper.toJsonString(value);
+ }
+
+ if (value instanceof List){
+ return JsonMapper.toJsonString(value);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * array 类型检验转换方法
+ *
+ * @param value json value
+ * @return List value
+ */
+ private static Map checkObject(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Map) {
+ return (Map) value;
+ }
+
+ throw new FlowWriteException("can not cast to map, value : " + value);
+ }
+
+ /**
+ * array 类型检验转换方法
+ *
+ * @param value json value
+ * @return List value
+ */
+ private static List checkArray(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof List) {
+ return (List) value;
+ }
+
+ throw new FlowWriteException("can not cast to List, value : " + value);
+ }
+
+ private static Long checkLong(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToLong(value);
+ }
+
+ /**
+ * long 类型检验转换方法,若为空返回基础值
+ *
+ * @param value json value
+ * @return Long value
+ */
+ private static long checkLongValue(Object value) {
+ Long longVal = TypeUtils.castToLong(value);
+ if (longVal == null) {
+ return 0L;
+ }
+
+// return longVal.longValue();
+ return longVal;
+ }
+
+ /**
+ * Double 类型校验转换方法
+ *
+ * @param value json value
+ * @return Double value
+ */
+ private static Double checkDouble(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToDouble(value);
+ }
+
+
+ private static Integer checkInt(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return TypeUtils.castToInt(value);
+ }
+
+
+ /**
+ * int 类型检验转换方法,若为空返回基础值
+ *
+ * @param value json value
+ * @return int value
+ */
+ private static int getIntValue(Object value) {
+
+ Integer intVal = TypeUtils.castToInt(value);
+ if (intVal == null) {
+ return 0;
+ }
+
+// return intVal.intValue();
+ return intVal;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
new file mode 100644
index 0000000..b13627f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java
@@ -0,0 +1,171 @@
+package com.zdjizhi.utils.json;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.utils.exception.FlowWriteException;
+
+/**
+ * @author qidaijie
+ * @Package PACKAGE_NAME
+ * @Description:
+ * @date 2021/7/1218:20
+ */
+public class TypeUtils {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * Integer 类型判断方法
+ *
+ * @param value json value
+ * @return Integer value or null
+ */
+ public static Object castToIfFunction(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value.toString();
+ }
+
+ if (value instanceof Integer) {
+ return ((Number) value).intValue();
+ }
+
+ if (value instanceof Long) {
+ return ((Number) value).longValue();
+ }
+
+// if (value instanceof Map) {
+// return (Map) value;
+// }
+//
+// if (value instanceof List) {
+// return Collections.singletonList(value.toString());
+// }
+
+ if (value instanceof Boolean) {
+ return (Boolean) value ? 1 : 0;
+ }
+
+ throw new FlowWriteException("can not cast to int, value : " + value);
+ }
+
+ /**
+ * Integer 类型判断方法
+ *
+ * @param value json value
+ * @return Integer value or null
+ */
+ static Integer castToInt(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Integer) {
+ return (Integer) value;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //将 10,20 类数据转换为10
+ if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Integer.parseInt(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Integer Error,The error Str is:" + strVal);
+ }
+ }
+
+ if (value instanceof Boolean) {
+ return (Boolean) value ? 1 : 0;
+ }
+
+ throw new FlowWriteException("can not cast to int, value : " + value);
+ }
+
+ /**
+ * Double类型判断方法
+ *
+ * @param value json value
+ * @return double value or null
+ */
+ static Double castToDouble(Object value) {
+
+ if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //将 10,20 类数据转换为10
+ if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Double.parseDouble(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Double Error,The error Str is:" + strVal);
+ }
+ }
+
+ throw new FlowWriteException("can not cast to double, value : " + value);
+ }
+
+ /**
+ * Long类型判断方法
+ *
+ * @param value json value
+ * @return (Long)value or null
+ */
+ static Long castToLong(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+
+ if (value instanceof String) {
+ String strVal = (String) value;
+
+ if (StringUtil.isBlank(strVal)) {
+ return null;
+ }
+
+ //将 10,20 类数据转换为10
+ if (strVal.contains(FlowWriteConfig.FORMAT_SPLITTER)) {
+ strVal = strVal.split(FlowWriteConfig.FORMAT_SPLITTER)[0];
+ }
+
+ try {
+ return Long.parseLong(strVal);
+ } catch (NumberFormatException ex) {
+ logger.error("String change Long Error,The error Str is:" + strVal);
+ }
+ }
+
+ throw new FlowWriteException("can not cast to long, value : " + value);
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
new file mode 100644
index 0000000..b09eedb
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/9/610:37
+ */
+class CertUtils {
+ static void chooseCert(String type, Properties properties) {
+ switch (type) {
+ case "SSL":
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", FlowWriteConfig.KAFKA_PIN);
+ break;
+ case "SASL":
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + FlowWriteConfig.KAFKA_USER + " password=" + FlowWriteConfig.KAFKA_PIN + ";");
+ break;
+ default:
+ }
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
new file mode 100644
index 0000000..95bbe0b
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
@@ -0,0 +1,44 @@
+package com.zdjizhi.utils.kafka;
+
+import com.sun.tools.javac.comp.Flow;
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.config.SslConfigs;
+
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/813:54
+ */
+public class Consumer {
+ private static Properties createConsumerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", FlowWriteConfig.INPUT_KAFKA_SERVERS);
+ properties.put("group.id", FlowWriteConfig.GROUP_ID);
+ properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS);
+ properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS);
+ properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
+ properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ CertUtils.chooseCert(FlowWriteConfig.KAFKA_SOURCE_PROTOCOL,properties);
+
+ return properties;
+ }
+
+ public static FlinkKafkaConsumer<String> getKafkaConsumer() {
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.INPUT_KAFKA_TOPIC,
+ new SimpleStringSchema(), createConsumerConfig());
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
new file mode 100644
index 0000000..58b3e7a
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
@@ -0,0 +1,79 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/814:04
+ */
+public class Producer {
+
+ private static Properties createPercentProducerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", FlowWriteConfig.OUTPUT_KAFKA_SERVERS);
+ properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
+ properties.put("retries", FlowWriteConfig.RETRIES);
+ properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
+ properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
+ properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
+ properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
+ CertUtils.chooseCert(FlowWriteConfig.KAFKA_SINK_PROTOCOL, properties);
+
+ return properties;
+ }
+ private static Properties createProducerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", FlowWriteConfig.INPUT_KAFKA_SERVERS);
+ properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
+ properties.put("retries", FlowWriteConfig.RETRIES);
+ properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
+ properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
+ properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
+ properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
+ CertUtils.chooseCert(FlowWriteConfig.KAFKA_SOURCE_PROTOCOL,properties);
+
+ return properties;
+ }
+
+ public static FlinkKafkaProducer<String> getPercentKafkaProducer() {
+ FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
+ FlowWriteConfig.PERCENT_KAFKA_TOPIC,
+ new SimpleStringSchema(),
+ createPercentProducerConfig(), Optional.empty());
+
+ kafkaProducer.setLogFailuresOnly(false);
+
+// kafkaProducer.setWriteTimestampToKafka(true);
+
+ return kafkaProducer;
+ }
+
+
+ public static FlinkKafkaProducer<String> getKafkaProducer() {
+ FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
+ FlowWriteConfig.OUTPUT_KAFKA_TOPIC,
+ new SimpleStringSchema(),
+ createProducerConfig(), Optional.empty());
+
+ kafkaProducer.setLogFailuresOnly(false);
+
+// kafkaProducer.setWriteTimestampToKafka(true);
+
+ return kafkaProducer;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java b/src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java
new file mode 100644
index 0000000..aa55951
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java
@@ -0,0 +1,64 @@
+package com.zdjizhi.utils.ordinary;
+
+import org.apache.log4j.Logger;
+
+import java.security.MessageDigest;
+
+/**
+ * 描述:转换MD5工具类
+ *
+ * @author Administrator
+ * @create 2018-08-13 15:11
+ */
+public class MD5Utils {
+ private static Logger logger = Logger.getLogger(MD5Utils.class);
+
+ public static String md5Encode(String msg) throws Exception {
+ try {
+ byte[] msgBytes = msg.getBytes("utf-8");
+ /*
+ * 声明使用Md5算法,获得MessaDigest对象
+ */
+ MessageDigest md5 = MessageDigest.getInstance("MD5");
+ /*
+ * 使用指定的字节更新摘要
+ */
+ md5.update(msgBytes);
+ /*
+ * 完成哈希计算,获得密文
+ */
+ byte[] digest = md5.digest();
+ /*
+ * 以上两行代码等同于
+ * byte[] digest = md5.digest(msgBytes);
+ */
+ return byteArr2hexString(digest);
+ } catch (Exception e) {
+ logger.error("Error in conversion MD5! " + msg);
+// e.printStackTrace();
+ return "";
+ }
+ }
+
+ /**
+ * 将byte数组转化为16进制字符串形式
+ *
+ * @param bys 字节数组
+ * @return 字符串
+ */
+ public static String byteArr2hexString(byte[] bys) {
+ StringBuffer hexVal = new StringBuffer();
+ int val = 0;
+ for (byte by : bys) {
+ //将byte转化为int 如果byte是一个负数就必须要和16进制的0xff做一次与运算
+ val = ((int) by) & 0xff;
+ if (val < 16) {
+ hexVal.append("0");
+ }
+ hexVal.append(Integer.toHexString(val));
+ }
+
+ return hexVal.toString();
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
new file mode 100644
index 0000000..f60def9
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/system/FlowWriteConfigurations.java
@@ -0,0 +1,81 @@
+package com.zdjizhi.utils.system;
+
+import com.zdjizhi.utils.StringUtil;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class FlowWriteConfigurations {
+
+ private static Properties propKafka = new Properties();
+ private static Properties propService = new Properties();
+ private static Properties propfiletype = new Properties();
+
+ private static Map<String, String> fileTypeMap;
+
+
+ public static boolean judgeFileType(String filetype){
+ return fileTypeMap.containsKey(filetype);
+ }
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propService.getProperty(key);
+ } else if (type == 1) {
+ return propKafka.getProperty(key);
+ } else {
+ return null;
+ }
+
+ }
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propService.getProperty(key));
+ } else if (type == 1) {
+ return Integer.parseInt(propKafka.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propService.getProperty(key));
+ } else if (type == 1) {
+ return Long.parseLong(propKafka.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else if (type == 1) {
+ return StringUtil.equals(propKafka.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ propKafka.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
+ propfiletype.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("file_type.properties"));
+ fileTypeMap = new HashMap<String, String>((Map) propfiletype);
+ } catch (IOException | RuntimeException e) {
+ propKafka = null;
+ propService = null;
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java
new file mode 100644
index 0000000..2afab03
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/zookeeper/DistributedLock.java
@@ -0,0 +1,190 @@
+package com.zdjizhi.utils.zookeeper;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * @author qidaijie
+ */
+public class DistributedLock implements Lock, Watcher {
+ private static final Log logger = LogFactory.get();
+
+ private ZooKeeper zk = null;
+ /**
+ * 根节点
+ */
+ private final String ROOT_LOCK = "/locks";
+ /**
+ * 竞争的资源
+ */
+ private String lockName;
+ /**
+ * 等待的前一个锁
+ */
+ private String waitLock;
+ /**
+ * 当前锁
+ */
+ private String currentLock;
+ /**
+ * 计数器
+ */
+ private CountDownLatch countDownLatch;
+
+ private int sessionTimeout = 2000;
+
+ private List<Exception> exceptionList = new ArrayList<Exception>();
+
+ /**
+ * 配置分布式锁
+ *
+ * @param config 连接的url
+ * @param lockName 竞争资源
+ */
+ public DistributedLock(String config, String lockName) {
+ this.lockName = lockName;
+ try {
+ // 连接zookeeper
+ zk = new ZooKeeper(config, sessionTimeout, this);
+ Stat stat = zk.exists(ROOT_LOCK, false);
+ if (stat == null) {
+ // 如果根节点不存在,则创建根节点
+ zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (IOException | InterruptedException | KeeperException e) {
+ logger.error("Node already exists!");
+ }
+ }
+
+ // 节点监视器
+ @Override
+ public void process(WatchedEvent event) {
+ if (this.countDownLatch != null) {
+ this.countDownLatch.countDown();
+ }
+ }
+
+ @Override
+ public void lock() {
+ if (exceptionList.size() > 0) {
+ throw new LockException(exceptionList.get(0));
+ }
+ try {
+ if (this.tryLock()) {
+ logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁");
+ } else {
+ // 等待锁
+ waitForLock(waitLock, sessionTimeout);
+ }
+ } catch (InterruptedException | KeeperException e) {
+ logger.error("获取锁异常" + e);
+ }
+ }
+
+ @Override
+ public boolean tryLock() {
+ try {
+ String splitStr = "_lock_";
+ if (lockName.contains(splitStr)) {
+ throw new LockException("锁名有误");
+ }
+ // 创建临时有序节点
+ currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ // 取所有子节点
+ List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
+ // 取出所有lockName的锁
+ List<String> lockObjects = new ArrayList<String>();
+ for (String node : subNodes) {
+ String tmpNode = node.split(splitStr)[0];
+ if (tmpNode.equals(lockName)) {
+ lockObjects.add(node);
+ }
+ }
+ Collections.sort(lockObjects);
+ // 若当前节点为最小节点,则获取锁成功
+ if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
+ return true;
+ }
+ // 若不是最小节点,则找到自己的前一个节点
+ String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
+ waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
+ } catch (InterruptedException | KeeperException e) {
+ logger.error("获取锁过程异常" + e);
+ }
+ return false;
+ }
+
+
+ @Override
+ public boolean tryLock(long timeout, TimeUnit unit) {
+ try {
+ if (this.tryLock()) {
+ return true;
+ }
+ return waitForLock(waitLock, timeout);
+ } catch (KeeperException | InterruptedException | RuntimeException e) {
+ logger.error("判断是否锁定异常" + e);
+ }
+ return false;
+ }
+
+ // 等待锁
+ private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException {
+ Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
+
+ if (stat != null) {
+ this.countDownLatch = new CountDownLatch(1);
+ // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁
+ this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
+ this.countDownLatch = null;
+ }
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ zk.delete(currentLock, -1);
+ currentLock = null;
+ zk.close();
+ } catch (InterruptedException | KeeperException e) {
+ logger.error("关闭锁异常" + e);
+ }
+ }
+
+ @Override
+ public Condition newCondition() {
+ return null;
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ this.lock();
+ }
+
+
+ public class LockException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public LockException(String e) {
+ super(e);
+ }
+
+ public LockException(Exception e) {
+ super(e);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
new file mode 100644
index 0000000..9efbd46
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
@@ -0,0 +1,140 @@
+package com.zdjizhi.utils.zookeeper;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author qidaijie
+ * @Package cn.ac.iie.utils.zookeeper
+ * @Description:
+ * @date 2020/11/1411:28
+ */
+public class ZookeeperUtils implements Watcher {
+ private static final Log logger = LogFactory.get();
+ private static final int ID_MAX = 255;
+
+ private ZooKeeper zookeeper;
+
+ private static final int SESSION_TIME_OUT = 20000;
+
+ private CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.SyncConnected) {
+ countDownLatch.countDown();
+ }
+ }
+
+
+ /**
+ * 修改节点信息
+ *
+ * @param path 节点路径
+ */
+ public int modifyNode(String path, String zookeeperIp) {
+ createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp);
+ int workerId = 0;
+ try {
+ connectZookeeper(zookeeperIp);
+ Stat stat = zookeeper.exists(path, true);
+ workerId = Integer.parseInt(getNodeDate(path));
+ if (workerId > ID_MAX) {
+ workerId = 0;
+ zookeeper.setData(path, "1".getBytes(), stat.getVersion());
+ } else {
+ String result = String.valueOf(workerId + 1);
+ if (stat != null) {
+ zookeeper.setData(path, result.getBytes(), stat.getVersion());
+ } else {
+ logger.error("Node does not exist!,Can't modify");
+ }
+ }
+ } catch (KeeperException | InterruptedException e) {
+ logger.error("modify error Can't modify," + e);
+ } finally {
+ closeConn();
+ }
+ logger.warn("workerID is:" + workerId);
+ return workerId;
+ }
+
+ /**
+ * 连接zookeeper
+ *
+ * @param host 地址
+ */
+ public void connectZookeeper(String host) {
+ try {
+ zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
+ countDownLatch.await();
+ } catch (IOException | InterruptedException e) {
+ logger.error("Connection to the Zookeeper Exception! message:" + e);
+ }
+ }
+
+ /**
+ * 关闭连接
+ */
+ public void closeConn() {
+ try {
+ if (zookeeper != null) {
+ zookeeper.close();
+ }
+ } catch (InterruptedException e) {
+ logger.error("Close the Zookeeper connection Exception! message:" + e);
+ }
+ }
+
+ /**
+ * 获取节点内容
+ *
+ * @param path 节点路径
+ * @return 内容/异常null
+ */
+ public String getNodeDate(String path) {
+ String result = null;
+ Stat stat = new Stat();
+ try {
+ byte[] resByte = zookeeper.getData(path, true, stat);
+
+ result = StrUtil.str(resByte, "UTF-8");
+ } catch (KeeperException | InterruptedException e) {
+ logger.error("Get node information exception" + e);
+ }
+ return result;
+ }
+
+ /**
+ * @param path 节点创建的路径
+ * @param date 节点所存储的数据的byte[]
+ * @param acls 控制权限策略
+ */
+ public void createNode(String path, byte[] date, List<ACL> acls, String zookeeperIp) {
+ try {
+ connectZookeeper(zookeeperIp);
+ Stat exists = zookeeper.exists(path, true);
+ if (exists == null) {
+ Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true);
+ if (existsSnowflakeld == null) {
+ zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT);
+ }
+ zookeeper.create(path, date, acls, CreateMode.PERSISTENT);
+ } else {
+ logger.warn("Node already exists ! Don't need to create");
+ }
+ } catch (KeeperException | InterruptedException e) {
+ logger.error(e);
+ } finally {
+ closeConn();
+ }
+ }
+}
diff --git a/src/main/log4j.properties b/src/main/log4j.properties
new file mode 100644
index 0000000..9d91936
--- /dev/null
+++ b/src/main/log4j.properties
@@ -0,0 +1,25 @@
+#Log4j
+log4j.rootLogger=info,console,file
+# 控制台日志设置
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# 文件日志设置
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#路径请用相对路径,做好相关测试输出到应用目下
+log4j.appender.file.file=${nis.root}/log/galaxy-name.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
+log4j.logger.com.nis.web.dao=debug
+#bonecp数据源配置
+log4j.category.com.jolbox=debug,console
+
+
diff --git a/src/main/logback.xml b/src/main/logback.xml
new file mode 100644
index 0000000..a508b6b
--- /dev/null
+++ b/src/main/logback.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+ <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符-->
+ <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
+ <!-- 定义日志存储的路径,不要配置相对路径 -->
+ <property name="LOG_FILE_PATH" value="E:/logs/demo.%d{yyyy-MM-dd}.%i.log" />
+
+ <!-- 控制台输出日志 -->
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <!-- 按照上面配置的LOG_PATTERN来打印日志 -->
+ <pattern>${LOG_PATTERN}</pattern>
+ </encoder>
+ </appender>
+
+ <!--每天生成一个日志文件,保存30天的日志文件。rollingFile是用来切分文件的 -->
+ <appender name="FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE_PATH}</fileNamePattern>
+ <!-- keep 15 days' worth of history -->
+ <maxHistory>30</maxHistory>
+ <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <!-- 日志文件的最大大小 -->
+ <maxFileSize>20MB</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+
+ <encoder>
+ <pattern>${LOG_PATTERN}</pattern>
+ </encoder>
+ </appender>
+ <!-- project default level项目输出的日志级别 -->
+ <logger name="com.example.demo" level="DEBUG" />
+
+ <!-- 日志输出级别 常用的日志级别按照从高到低依次为:ERROR、WARN、INFO、DEBUG。 -->
+ <root level="INFO">
+ <appender-ref ref="CONSOLE" />
+ <appender-ref ref="FILE" /><!--对应appender name="FILE"。 -->
+ </root>
+</configuration> \ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/FIleTypeTest.java b/src/test/java/com/zdjizhi/FIleTypeTest.java
new file mode 100644
index 0000000..031bccd
--- /dev/null
+++ b/src/test/java/com/zdjizhi/FIleTypeTest.java
@@ -0,0 +1,13 @@
+package com.zdjizhi;
+
+import static com.zdjizhi.utils.general.FileEdit.getFileType;
+import static com.zdjizhi.utils.system.FlowWriteConfigurations.judgeFileType;
+
+public class FIleTypeTest {
+ public static void main(String[] args) {
+ String fileType = getFileType("http://10.3.60.3:9098/hos/firewall_hos_bucket/http_response_body_10.11.36.187.49310-203.205.219.105.80_869195700602659293_1.html");
+ System.out.println(fileType);
+ boolean b = judgeFileType(fileType);
+ System.out.println(b);
+ }
+}
diff --git a/src/test/java/com/zdjizhi/KafkaTest.java b/src/test/java/com/zdjizhi/KafkaTest.java
new file mode 100644
index 0000000..4b034a3
--- /dev/null
+++ b/src/test/java/com/zdjizhi/KafkaTest.java
@@ -0,0 +1,55 @@
+package com.zdjizhi;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2021/8/217:39
+ */
+public class KafkaTest {
+ private static final Log logger = LogFactory.get();
+
+ public static void main(String[] args) {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", "192.168.44.12:9091");
+ properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put("acks", "1");
+// properties.put("retries", DefaultProConfig.RETRIES);
+// properties.put("linger.ms", DefaultProConfig.LINGER_MS);
+// properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
+// properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
+// properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
+// properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
+
+ properties.put("security.protocol", "SSL");
+// properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks");
+ properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\keystore.jks");
+ properties.put("ssl.keystore.password", "galaxy2019");
+// properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks");
+ properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\truststore.jks");
+ properties.put("ssl.truststore.password", "galaxy2019");
+ properties.put("ssl.key.password", "galaxy2019");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+
+ Producer<String, String> producer = new KafkaProducer<String, String>(properties);
+
+ producer.send(new ProducerRecord<>("test", "hello!"), new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ logger.error("写入test出现异常", exception);
+ }
+ }
+ });
+
+ producer.close();
+ }
+}
diff --git a/src/test/java/com/zdjizhi/LocationTest.java b/src/test/java/com/zdjizhi/LocationTest.java
new file mode 100644
index 0000000..e7b2d15
--- /dev/null
+++ b/src/test/java/com/zdjizhi/LocationTest.java
@@ -0,0 +1,28 @@
+package com.zdjizhi;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.IpLookup;
+import org.junit.Test;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2021/8/1811:34
+ */
+public class LocationTest {
+ private static IpLookup ipLookup = new IpLookup.Builder(false)
+ .loadDataFileV4("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_v4.mmdb")
+ .loadDataFileV6("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_v6.mmdb")
+ .loadDataFilePrivateV4("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_private_v4.mmdb")
+ .loadDataFilePrivateV6("D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ip_private_v6.mmdb")
+ .build();
+
+ @Test
+ public void IpLocationTest() {
+ System.out.println(ipLookup.cityLookupDetail("24.241.112.0"));
+ System.out.println(ipLookup.cityLookupDetail("1.1.1.1"));
+ System.out.println(ipLookup.cityLookupDetail("192.168.50.58"));
+ System.out.println(ipLookup.cityLookupDetail("2600:1700:9010::"));
+ }
+}
diff --git a/target/classes/com/zdjizhi/bean/FileMeta.class b/target/classes/com/zdjizhi/bean/FileMeta.class
new file mode 100644
index 0000000..a392040
--- /dev/null
+++ b/target/classes/com/zdjizhi/bean/FileMeta.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/bean/SourceList.class b/target/classes/com/zdjizhi/bean/SourceList.class
new file mode 100644
index 0000000..ceb238c
--- /dev/null
+++ b/target/classes/com/zdjizhi/bean/SourceList.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/common/FlowWriteConfig.class b/target/classes/com/zdjizhi/common/FlowWriteConfig.class
new file mode 100644
index 0000000..2f93a42
--- /dev/null
+++ b/target/classes/com/zdjizhi/common/FlowWriteConfig.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/topology/LogFlowWriteTopology.class b/target/classes/com/zdjizhi/topology/LogFlowWriteTopology.class
new file mode 100644
index 0000000..05c4bd8
--- /dev/null
+++ b/target/classes/com/zdjizhi/topology/LogFlowWriteTopology.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/app/AppUtils$1.class b/target/classes/com/zdjizhi/utils/app/AppUtils$1.class
new file mode 100644
index 0000000..c2518be
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/app/AppUtils$1.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/app/AppUtils.class b/target/classes/com/zdjizhi/utils/app/AppUtils.class
new file mode 100644
index 0000000..3c6329f
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/app/AppUtils.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/exception/FlowWriteException.class b/target/classes/com/zdjizhi/utils/exception/FlowWriteException.class
new file mode 100644
index 0000000..d921793
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/exception/FlowWriteException.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/fast/TransFormFast.class b/target/classes/com/zdjizhi/utils/fast/TransFormFast.class
new file mode 100644
index 0000000..db6b403
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/fast/TransFormFast.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/fast/TransFunctionFast.class b/target/classes/com/zdjizhi/utils/fast/TransFunctionFast.class
new file mode 100644
index 0000000..13d280d
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/fast/TransFunctionFast.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction$1.class b/target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction$1.class
new file mode 100644
index 0000000..4ea09cc
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction$1.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction.class b/target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction.class
new file mode 100644
index 0000000..c641e91
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/functions/DealFileProcessFunction.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/functions/FilterNullFunction.class b/target/classes/com/zdjizhi/utils/functions/FilterNullFunction.class
new file mode 100644
index 0000000..24e4d91
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/functions/FilterNullFunction.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/functions/MapCompletedFunction.class b/target/classes/com/zdjizhi/utils/functions/MapCompletedFunction.class
new file mode 100644
index 0000000..b0cc24d
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/functions/MapCompletedFunction.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/functions/ObjectCompletedFunction.class b/target/classes/com/zdjizhi/utils/functions/ObjectCompletedFunction.class
new file mode 100644
index 0000000..0f4959d
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/functions/ObjectCompletedFunction.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/functions/TypeMapCompletedFunction.class b/target/classes/com/zdjizhi/utils/functions/TypeMapCompletedFunction.class
new file mode 100644
index 0000000..a1eb14d
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/functions/TypeMapCompletedFunction.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/general/FileEdit.class b/target/classes/com/zdjizhi/utils/general/FileEdit.class
new file mode 100644
index 0000000..504bff4
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/general/FileEdit.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/general/SnowflakeId.class b/target/classes/com/zdjizhi/utils/general/SnowflakeId.class
new file mode 100644
index 0000000..6f66064
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/general/SnowflakeId.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/general/TransFormMap.class b/target/classes/com/zdjizhi/utils/general/TransFormMap.class
new file mode 100644
index 0000000..7310101
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/general/TransFormMap.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/general/TransFormObject.class b/target/classes/com/zdjizhi/utils/general/TransFormObject.class
new file mode 100644
index 0000000..7269fce
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/general/TransFormObject.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/general/TransFormTypeMap.class b/target/classes/com/zdjizhi/utils/general/TransFormTypeMap.class
new file mode 100644
index 0000000..8a997d0
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/general/TransFormTypeMap.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/general/TransFunction.class b/target/classes/com/zdjizhi/utils/general/TransFunction.class
new file mode 100644
index 0000000..5d9b3ad
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/general/TransFunction.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/hbase/HBaseUtils$1.class b/target/classes/com/zdjizhi/utils/hbase/HBaseUtils$1.class
new file mode 100644
index 0000000..314f309
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/hbase/HBaseUtils$1.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/hbase/HBaseUtils.class b/target/classes/com/zdjizhi/utils/hbase/HBaseUtils.class
new file mode 100644
index 0000000..9d0238d
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/hbase/HBaseUtils.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/http/HttpClientUtil.class b/target/classes/com/zdjizhi/utils/http/HttpClientUtil.class
new file mode 100644
index 0000000..903c4f8
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/http/HttpClientUtil.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/json/JsonParseUtil.class b/target/classes/com/zdjizhi/utils/json/JsonParseUtil.class
new file mode 100644
index 0000000..6a817c6
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/json/JsonParseUtil.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/json/JsonTypeUtils.class b/target/classes/com/zdjizhi/utils/json/JsonTypeUtils.class
new file mode 100644
index 0000000..a543fe3
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/json/JsonTypeUtils.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/json/TypeUtils.class b/target/classes/com/zdjizhi/utils/json/TypeUtils.class
new file mode 100644
index 0000000..a5059fd
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/json/TypeUtils.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/kafka/CertUtils.class b/target/classes/com/zdjizhi/utils/kafka/CertUtils.class
new file mode 100644
index 0000000..112eb6f
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/kafka/CertUtils.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/kafka/Consumer.class b/target/classes/com/zdjizhi/utils/kafka/Consumer.class
new file mode 100644
index 0000000..836c13d
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/kafka/Consumer.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/kafka/Producer.class b/target/classes/com/zdjizhi/utils/kafka/Producer.class
new file mode 100644
index 0000000..fc282e3
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/kafka/Producer.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/ordinary/MD5Utils.class b/target/classes/com/zdjizhi/utils/ordinary/MD5Utils.class
new file mode 100644
index 0000000..939b5d0
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/ordinary/MD5Utils.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/system/FlowWriteConfigurations.class b/target/classes/com/zdjizhi/utils/system/FlowWriteConfigurations.class
new file mode 100644
index 0000000..c99eb01
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/system/FlowWriteConfigurations.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/zookeeper/DistributedLock$LockException.class b/target/classes/com/zdjizhi/utils/zookeeper/DistributedLock$LockException.class
new file mode 100644
index 0000000..c9e6e98
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/zookeeper/DistributedLock$LockException.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/zookeeper/DistributedLock.class b/target/classes/com/zdjizhi/utils/zookeeper/DistributedLock.class
new file mode 100644
index 0000000..06610f2
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/zookeeper/DistributedLock.class
Binary files differ
diff --git a/target/classes/com/zdjizhi/utils/zookeeper/ZookeeperUtils.class b/target/classes/com/zdjizhi/utils/zookeeper/ZookeeperUtils.class
new file mode 100644
index 0000000..aadc30e
--- /dev/null
+++ b/target/classes/com/zdjizhi/utils/zookeeper/ZookeeperUtils.class
Binary files differ
diff --git a/target/classes/default_config.properties b/target/classes/default_config.properties
new file mode 100644
index 0000000..aeb0309
--- /dev/null
+++ b/target/classes/default_config.properties
@@ -0,0 +1,54 @@
+#====================Kafka Consumer====================#
+#kafka source connection timeout
+session.timeout.ms=60000
+
+#kafka source poll
+max.poll.records=3000
+
+#kafka source poll bytes
+max.partition.fetch.bytes=31457280
+#====================Kafka Producer====================#
+#producer重试的次数设置
+retries=0
+
+#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了
+linger.ms=10
+
+#如果在超时之前未收到响应,客户端将在必要时重新发送请求
+request.timeout.ms=30000
+
+#producer都是按照batch进行发送的,批次大小,默认:16384
+batch.size=262144
+
+#Producer端用于缓存消息的缓冲区大小
+#128M
+buffer.memory=134217728
+
+#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576
+#10M
+max.request.size=10485760
+#====================kafka default====================#
+#kafka source protocol; SSL or SASL
+kafka.source.protocol=SASL
+
+#kafka sink protocol; SSL or SASL
+kafka.sink.protocol=
+
+#kafka SASL验证用户名
+kafka.user=admin
+
+#kafka SASL及SSL验证密码
+kafka.pin=galaxy2019
+#====================Topology Default====================#
+
+#hbase table name
+hbase.table.name=tsg_galaxy:relation_framedip_account
+
+#邮件默认编码
+mail.default.charset=UTF-8
+
+#0不做任何校验,1强类型校验,2弱类型校验
+log.transform.type=2
+
+#两个输出之间的最大时间(单位milliseconds)
+buffer.timeout=5000 \ No newline at end of file
diff --git a/target/classes/file_type.properties b/target/classes/file_type.properties
new file mode 100644
index 0000000..8ffc908
--- /dev/null
+++ b/target/classes/file_type.properties
@@ -0,0 +1,5 @@
+txt
+html
+eml
+jpg
+png \ No newline at end of file
diff --git a/target/classes/service_flow_config.properties b/target/classes/service_flow_config.properties
new file mode 100644
index 0000000..03e3b33
--- /dev/null
+++ b/target/classes/service_flow_config.properties
@@ -0,0 +1,73 @@
+#--------------------------------地址配置------------------------------#
+
+#管理kafka地址
+input.kafka.servers=10.3.60.3:9094
+
+#管理输出kafka地址
+output.kafka.servers=10.3.60.3:9092
+
+#zookeeper 地址 用于配置log_id
+zookeeper.servers=10.3.60.3:2181
+
+#hbase zookeeper地址 用于连接HBase
+hbase.zookeeper.servers=10.3.60.3:2181
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库地址
+tools.library=D:/dingweiku/dat/
+
+#网关的schema位置
+schema.http=http://10.3.60.3:9999/metadata/schema/v1/fields/security_event
+
+#网关APP_ID 获取接口
+app.id.http=http://10.3.60.3:9999/open-api/appDicList
+
+#--------------------------------Kafka消费组信息------------------------------#
+
+#oos地址
+oos.servers=10.3.45.124:8057
+#输入kafka
+input.kafka.topic=test11
+#文件源数据topic
+output.kafka.topic=test-file-data
+
+percent.kafka.topic=test
+
+
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=flink-test-1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=none
+
+#生产者ack
+producer.ack=1
+
+#--------------------------------topology配置------------------------------#
+
+#consumer 并行度
+source.parallelism=10
+
+#转换函数并行度
+transform.parallelism=10
+
+#kafka producer 并行度
+sink.parallelism=10
+
+#数据中心,取值范围(0-63)
+data.center.id.num=7
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#app_id 更新时间,如填写0则不更新缓存
+app.tick.tuple.freq.secs=0
+
+#--------------------------------默认值配置------------------------------#
+
+#邮件默认编码
+mail.default.charset=UTF-8
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=1 \ No newline at end of file
diff --git a/target/log-completion-doublewrite-1214.jar b/target/log-completion-doublewrite-1214.jar
new file mode 100644
index 0000000..4ed4ff2
--- /dev/null
+++ b/target/log-completion-doublewrite-1214.jar
Binary files differ
diff --git a/target/maven-archiver/pom.properties b/target/maven-archiver/pom.properties
new file mode 100644
index 0000000..05d9bd6
--- /dev/null
+++ b/target/maven-archiver/pom.properties
@@ -0,0 +1,5 @@
+#Generated by Maven
+#Wed Dec 15 16:02:33 CST 2021
+version=1214
+groupId=com.zdjizhi
+artifactId=log-completion-doublewrite
diff --git a/target/original-log-completion-doublewrite-1214.jar b/target/original-log-completion-doublewrite-1214.jar
new file mode 100644
index 0000000..f04a166
--- /dev/null
+++ b/target/original-log-completion-doublewrite-1214.jar
Binary files differ
diff --git a/target/surefire-reports/TEST-com.zdjizhi.LocationTest.xml b/target/surefire-reports/TEST-com.zdjizhi.LocationTest.xml
new file mode 100644
index 0000000..4c98dcb
--- /dev/null
+++ b/target/surefire-reports/TEST-com.zdjizhi.LocationTest.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<testsuite tests="1" failures="0" name="com.zdjizhi.LocationTest" time="0.018" errors="0" skipped="0">
+ <properties>
+ <property name="java.runtime.name" value="Java(TM) SE Runtime Environment"/>
+ <property name="sun.boot.library.path" value="C:\Program Files\Java\jdk1.8.0_191\jre\bin"/>
+ <property name="java.vm.version" value="25.191-b12"/>
+ <property name="java.vm.vendor" value="Oracle Corporation"/>
+ <property name="maven.multiModuleProjectDirectory" value="D:\p19-etl\log-stream-doublewrite"/>
+ <property name="java.vendor.url" value="http://java.oracle.com/"/>
+ <property name="path.separator" value=";"/>
+ <property name="guice.disable.misplaced.annotation.check" value="true"/>
+ <property name="java.vm.name" value="Java HotSpot(TM) 64-Bit Server VM"/>
+ <property name="file.encoding.pkg" value="sun.io"/>
+ <property name="user.script" value=""/>
+ <property name="user.country" value="CN"/>
+ <property name="sun.java.launcher" value="SUN_STANDARD"/>
+ <property name="sun.os.patch.level" value=""/>
+ <property name="java.vm.specification.name" value="Java Virtual Machine Specification"/>
+ <property name="user.dir" value="D:\p19-etl\log-stream-doublewrite"/>
+ <property name="java.runtime.version" value="1.8.0_191-b12"/>
+ <property name="java.awt.graphicsenv" value="sun.awt.Win32GraphicsEnvironment"/>
+ <property name="java.endorsed.dirs" value="C:\Program Files\Java\jdk1.8.0_191\jre\lib\endorsed"/>
+ <property name="os.arch" value="amd64"/>
+ <property name="java.io.tmpdir" value="C:\Users\ZDJZ\AppData\Local\Temp\"/>
+ <property name="line.separator" value="
+"/>
+ <property name="java.vm.specification.vendor" value="Oracle Corporation"/>
+ <property name="user.variant" value=""/>
+ <property name="os.name" value="Windows 10"/>
+ <property name="maven.ext.class.path" value="C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3\plugins\maven\lib\maven-event-listener.jar"/>
+ <property name="classworlds.conf" value="C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3\plugins\maven\lib\maven3\bin\m2.conf"/>
+ <property name="sun.jnu.encoding" value="GBK"/>
+ <property name="java.library.path" value="C:\Program Files\Java\jdk1.8.0_191\bin;C:\WINDOWS\Sun\Java\bin;C:\WINDOWS\system32;C:\WINDOWS;C:\Program Files (x86)\Common Files\Oracle\Java\javapath;C:\WINDOWS\system32;C:\WINDOWS;C:\WINDOWS\System32\Wbem;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;&quot;C:\Program Files\Java\jdk1.8.0_191\bin;&quot;;&quot;C:\Program Files\Java\jdk1.8.0_191\jre\bin;D:\apache-maven-3.5.4\bin;C:\WINDOWS\System32\OpenSSH\&quot;;D:\git\bin;D:\git\mingw64\libexec\git-core;D:\git\mingw64\bin;D:\Git\cmd;D:\hadoop\hadoop-2.7.3\bin;C:\Users\ZDJZ\AppData\Local\Microsoft\WindowsApps;D:\IntelliJ IDEA 2020.1\bin;;D:\git\bin;D:\git\mingw64\libexec\git-core;D:\git\mingw64\bin;;C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3\bin;;."/>
+ <property name="maven.conf" value="C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3\plugins\maven\lib\maven3/conf"/>
+ <property name="java.specification.name" value="Java Platform API Specification"/>
+ <property name="java.class.version" value="52.0"/>
+ <property name="sun.management.compiler" value="HotSpot 64-Bit Tiered Compilers"/>
+ <property name="os.version" value="10.0"/>
+ <property name="user.home" value="C:\Users\ZDJZ"/>
+ <property name="user.timezone" value="Asia/Shanghai"/>
+ <property name="java.awt.printerjob" value="sun.awt.windows.WPrinterJob"/>
+ <property name="java.specification.version" value="1.8"/>
+ <property name="file.encoding" value="UTF-8"/>
+ <property name="user.name" value="ZDJZ"/>
+ <property name="java.class.path" value="C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3\plugins\maven\lib\maven3\boot\plexus-classworlds-2.6.0.jar;C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3\lib\idea_rt.jar"/>
+ <property name="java.vm.specification.version" value="1.8"/>
+ <property name="sun.arch.data.model" value="64"/>
+ <property name="java.home" value="C:\Program Files\Java\jdk1.8.0_191\jre"/>
+ <property name="sun.java.command" value="org.codehaus.classworlds.Launcher -Didea.version2019.3 -s D:\apache-maven-3.5.4\conf\settings.xml package"/>
+ <property name="java.specification.vendor" value="Oracle Corporation"/>
+ <property name="user.language" value="zh"/>
+ <property name="awt.toolkit" value="sun.awt.windows.WToolkit"/>
+ <property name="java.vm.info" value="mixed mode"/>
+ <property name="java.version" value="1.8.0_191"/>
+ <property name="java.ext.dirs" value="C:\Program Files\Java\jdk1.8.0_191\jre\lib\ext;C:\WINDOWS\Sun\Java\lib\ext"/>
+ <property name="sun.boot.class.path" value="C:\Program Files\Java\jdk1.8.0_191\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\rt.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\sunrsasign.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_191\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_191\jre\classes"/>
+ <property name="java.vendor" value="Oracle Corporation"/>
+ <property name="maven.home" value="C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3\plugins\maven\lib\maven3"/>
+ <property name="file.separator" value="\"/>
+ <property name="idea.version2019.3" value="true"/>
+ <property name="java.vendor.url.bug" value="http://bugreport.sun.com/bugreport/"/>
+ <property name="sun.cpu.endian" value="little"/>
+ <property name="sun.io.unicode.encoding" value="UnicodeLittle"/>
+ <property name="sun.desktop" value="windows"/>
+ <property name="sun.cpu.isalist" value="amd64"/>
+ </properties>
+ <testcase classname="com.zdjizhi.LocationTest" name="IpLocationTest" time="0.018"/>
+</testsuite> \ No newline at end of file
diff --git a/target/surefire-reports/com.zdjizhi.LocationTest.txt b/target/surefire-reports/com.zdjizhi.LocationTest.txt
new file mode 100644
index 0000000..f8ff7c9
--- /dev/null
+++ b/target/surefire-reports/com.zdjizhi.LocationTest.txt
@@ -0,0 +1,4 @@
+-------------------------------------------------------------------------------
+Test set: com.zdjizhi.LocationTest
+-------------------------------------------------------------------------------
+Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.127 sec
diff --git a/target/test-classes/com/zdjizhi/FIleTypeTest.class b/target/test-classes/com/zdjizhi/FIleTypeTest.class
new file mode 100644
index 0000000..d6cc27e
--- /dev/null
+++ b/target/test-classes/com/zdjizhi/FIleTypeTest.class
Binary files differ
diff --git a/target/test-classes/com/zdjizhi/KafkaTest$1.class b/target/test-classes/com/zdjizhi/KafkaTest$1.class
new file mode 100644
index 0000000..94894f7
--- /dev/null
+++ b/target/test-classes/com/zdjizhi/KafkaTest$1.class
Binary files differ
diff --git a/target/test-classes/com/zdjizhi/KafkaTest.class b/target/test-classes/com/zdjizhi/KafkaTest.class
new file mode 100644
index 0000000..00972d6
--- /dev/null
+++ b/target/test-classes/com/zdjizhi/KafkaTest.class
Binary files differ
diff --git a/target/test-classes/com/zdjizhi/LocationTest.class b/target/test-classes/com/zdjizhi/LocationTest.class
new file mode 100644
index 0000000..8478cf7
--- /dev/null
+++ b/target/test-classes/com/zdjizhi/LocationTest.class
Binary files differ