summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-11-10 19:28:15 +0800
committerqidaijie <[email protected]>2022-11-10 19:28:15 +0800
commitbe73655ec4c5d9dd7930ed39e797dfee520a02e1 (patch)
tree0bf3d790aef6dd702ea4a2292a8aac53a6f0524a
parente7a5ecb4f7a338ed156f6f454947728c293617f7 (diff)
提交ETL功能支持知识库动态加载功能初版。(GAL-223)
-rw-r--r--pom.xml2
-rw-r--r--properties/default_config.properties22
-rw-r--r--properties/service_flow_config.properties46
-rw-r--r--src/main/java/com/zdjizhi/common/CustomFile.java28
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java39
-rw-r--r--src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java23
-rw-r--r--src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java23
-rw-r--r--src/main/java/com/zdjizhi/tools/functions/broadcast/KnowledgeBaseSource.java174
-rw-r--r--src/main/java/com/zdjizhi/tools/functions/filter/FilterNull.java (renamed from src/main/java/com/zdjizhi/tools/functions/FilterNullFunction.java)4
-rw-r--r--src/main/java/com/zdjizhi/tools/functions/map/MapCompleted.java74
-rw-r--r--src/main/java/com/zdjizhi/tools/functions/map/TypeMapCompleted.java74
-rw-r--r--src/main/java/com/zdjizhi/tools/general/IpLookup.java220
-rw-r--r--src/main/java/com/zdjizhi/tools/general/TransForm.java (renamed from src/main/java/com/zdjizhi/tools/general/TransFormMap.java)49
-rw-r--r--src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java134
-rw-r--r--src/main/java/com/zdjizhi/tools/general/TransFunction.java22
-rw-r--r--src/main/java/com/zdjizhi/tools/hdfs/HdfsFileUtils.java78
-rw-r--r--src/main/java/com/zdjizhi/tools/http/HttpClientUtils.java320
-rw-r--r--src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java6
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java2
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java57
-rw-r--r--src/main/logback.xml42
-rw-r--r--src/test/java/com/zdjizhi/EncryptorTest.java35
-rw-r--r--src/test/java/com/zdjizhi/FunctionTest.java52
-rw-r--r--src/test/java/com/zdjizhi/HBaseTest.java54
-rw-r--r--src/test/java/com/zdjizhi/function/Base64Test.java77
-rw-r--r--src/test/java/com/zdjizhi/function/EncryptorTest.java38
-rw-r--r--src/test/java/com/zdjizhi/function/HBaseTest.java245
-rw-r--r--src/test/java/com/zdjizhi/hdfs/FileUtilsTest.java68
-rw-r--r--src/test/java/com/zdjizhi/hos/hosUtilsTest.java101
-rw-r--r--src/test/java/com/zdjizhi/json/JsonPathTest.java85
-rw-r--r--src/test/java/com/zdjizhi/json/JsonTest.java23
-rw-r--r--src/test/java/com/zdjizhi/json/NewSchemaTest.java102
-rw-r--r--src/test/java/com/zdjizhi/json/OldSchemaTest.java (renamed from src/test/java/com/zdjizhi/nacos/SchemaListener.java)53
-rw-r--r--src/test/java/com/zdjizhi/nacos/NacosTest.java2
-rw-r--r--src/test/java/com/zdjizhi/nacos/SchemaListenerTest.java116
35 files changed, 1919 insertions, 571 deletions
diff --git a/pom.xml b/pom.xml
index 4395a3e..5a47983 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-completion-schema</artifactId>
- <version>20221103-Base64</version>
+ <version>20221110-KNOWLEDGEBASE</version>
<name>log-completion-schema</name>
<url>http://www.example.com</url>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index 9495f9c..06021b9 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -44,7 +44,27 @@ nacos.username=kANxu/Zi5rBnZVxa5zAjrQ==
nacos.pin=YPIBDIXjJUtVBjjk2op0Dg==
#nacos group
-nacos.group=Galaxy
+nacos.schema.group=Galaxy
+
+#nacos group
+nacos.knowledgebase.group=DEFAULT_GROUP
+
+#================= HTTP 配置 ====================#
+#max connection
+http.pool.max.connection=400
+
+#one route max connection
+http.pool.max.per.route=80
+
+#connect timeout(ms)
+http.pool.connect.timeout=60000
+
+#request timeout(ms)
+http.pool.request.timeout=60000
+
+#response timeout(ms)
+http.pool.response.timeout=60000
+
#====================Topology Default====================#
#hbase radius relation table name
hbase.radius.table.name=tsg_galaxy:relation_framedip_account
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index f76f57d..4289e44 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,5 +1,4 @@
#--------------------------------地址配置------------------------------#
-
#管理kafka地址
source.kafka.servers=192.168.44.12:9094
@@ -12,21 +11,39 @@ zookeeper.servers=192.168.44.12:2181
#hbase zookeeper地址 用于连接HBase
hbase.zookeeper.servers=192.168.44.12:2181
+#hdfs地址用于获取定位库
+hdfs.uri=hdfs://192.168.40.151:9000
+
#--------------------------------HTTP/定位库------------------------------#
-#定位库地址
-tools.library=D:\\workerspace\\dat\\
+#hos token
+hos.token=c21f969b5f03d33d43e04f8f136e7682
+
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.system.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.library=/qitest/
+
+#工具库地址,存放秘钥文件等。
+tools.library=E:\\workerspace\\dat\\
#--------------------------------nacos配置------------------------------#
#nacos 地址
nacos.server=192.168.44.12:8848
-#nacos namespace
+#schema namespace名称
nacos.schema.namespace=test
-#nacos data id
-nacos.data.id=session_record.json
+#schema data id名称
+nacos.schema.data.id=session_record.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=public
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
-#--------------------------------Kafka消费组信息------------------------------#
+#--------------------------------Kafka消费/生产配置------------------------------#
#kafka 接收数据topic
source.kafka.topic=test
@@ -35,10 +52,7 @@ source.kafka.topic=test
sink.kafka.topic=test-result
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=type-test-20220810-1
-
-#生产者压缩模式 none or snappy
-producer.kafka.compression.type=none
+group.id=session-record-log-20211105-1
#--------------------------------topology配置------------------------------#
@@ -51,13 +65,17 @@ transform.parallelism=1
#kafka producer 并行度
sink.parallelism=1
-#数据中心,取值范围(0-63)
-data.center.id.num=16
+#数据中心,取值范围(0-31)
+data.center.id.num=0
#hbase 更新时间,如填写0则不更新缓存
-hbase.tick.tuple.freq.secs=60
+hbase.tick.tuple.freq.secs=180
#--------------------------------默认值配置------------------------------#
#0不需要补全原样输出日志,1需要补全
log.need.complete=1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=none
+
diff --git a/src/main/java/com/zdjizhi/common/CustomFile.java b/src/main/java/com/zdjizhi/common/CustomFile.java
new file mode 100644
index 0000000..1c80f87
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/CustomFile.java
@@ -0,0 +1,28 @@
+package com.zdjizhi.common;
+
+import java.io.Serializable;
+
+public class CustomFile implements Serializable {
+
+ private String fileName;
+
+ private byte[] content;
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public byte[] getContent() {
+ return content;
+ }
+
+ public void setContent(byte[] content) {
+ this.content = content;
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index 7f92b02..e1804c9 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -16,6 +16,17 @@ public class FlowWriteConfig {
}
public static final int IF_PARAM_LENGTH = 3;
+
+ /**
+ * 定位库默认分隔符
+ */
+ public static final String LOCATION_SEPARATOR = ".";
+
+ /**
+ * 默认的文件系统标识
+ */
+ public static final String FILE_SYSTEM_TYPE = "hdfs";
+
/**
* 有此标识的字段为失效字段,不计入最终日志字段
*/
@@ -48,8 +59,11 @@ public class FlowWriteConfig {
*/
public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server");
public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace");
- public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id");
- public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group");
+ public static final String NACOS_SCHEMA_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.data.id");
+ public static final String NACOS_KNOWLEDGEBASE_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.knowledgebase.namespace");
+ public static final String NACOS_KNOWLEDGEBASE_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.knowledgebase.data.id");
+ public static final String NACOS_KNOWLEDGEBASE_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.knowledgebase.group");
+ public static final String NACOS_SCHEMA_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.schema.group");
public static final String NACOS_USERNAME = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "nacos.username"));
public static final String NACOS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "nacos.pin"));
@@ -76,6 +90,26 @@ public class FlowWriteConfig {
public static final String HBASE_GTPC_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.gtpc.table.name");
/**
+ * HDFS
+ */
+ public static final String HDFS_URI = FlowWriteConfigurations.getStringProperty(0, "hdfs.uri");
+ public static final String KNOWLEDGEBASE_FILE_SYSTEM_TYPE = FlowWriteConfigurations.getStringProperty(0, "knowledgebase.file.system.type");
+
+ /**
+ * HOS
+ */
+ public static final String HOS_TOKEN = FlowWriteConfigurations.getStringProperty(0, "hos.token");
+
+ /**
+ * HTTP
+ */
+ public static final Integer HTTP_POOL_MAX_CONNECTION = FlowWriteConfigurations.getIntProperty(1, "http.pool.max.connection");
+ public static final Integer HTTP_POOL_MAX_PER_ROUTE = FlowWriteConfigurations.getIntProperty(1, "http.pool.max.per.route");
+ public static final Integer HTTP_POOL_REQUEST_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "http.pool.request.timeout");
+ public static final Integer HTTP_POOL_CONNECT_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "http.pool.connect.timeout");
+ public static final Integer HTTP_POOL_RESPONSE_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "http.pool.response.timeout");
+
+ /**
* kafka common
*/
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
@@ -114,6 +148,7 @@ public class FlowWriteConfig {
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
+ public static final String KNOWLEDGEBASE_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "knowledgebase.library");
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java
deleted file mode 100644
index 6b8df35..0000000
--- a/src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.zdjizhi.tools.functions;
-
-
-import com.zdjizhi.tools.general.TransFormMap;
-import org.apache.flink.api.common.functions.MapFunction;
-
-import java.util.Map;
-
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class MapCompletedFunction implements MapFunction<Map<String, Object>, String> {
-
- @Override
- @SuppressWarnings("unchecked")
- public String map(Map<String, Object> logs) {
- return TransFormMap.dealCommonMessage(logs);
- }
-}
diff --git a/src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java
deleted file mode 100644
index 1b8dd6a..0000000
--- a/src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.zdjizhi.tools.functions;
-
-import com.zdjizhi.tools.general.TransFormTypeMap;
-import org.apache.flink.api.common.functions.MapFunction;
-
-import java.util.Map;
-
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class TypeMapCompletedFunction implements MapFunction<Map<String, Object>, String> {
-
- @Override
- @SuppressWarnings("unchecked")
- public String map(Map<String, Object> logs) {
-
- return TransFormTypeMap.dealCommonMessage(logs);
- }
-}
diff --git a/src/main/java/com/zdjizhi/tools/functions/broadcast/KnowledgeBaseSource.java b/src/main/java/com/zdjizhi/tools/functions/broadcast/KnowledgeBaseSource.java
new file mode 100644
index 0000000..fa32b70
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/functions/broadcast/KnowledgeBaseSource.java
@@ -0,0 +1,174 @@
+package com.zdjizhi.tools.functions.broadcast;
+
+import cn.hutool.core.io.FileUtil;
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.json.JSONObject;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.config.listener.Listener;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.google.common.base.Joiner;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.CustomFile;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.tools.hdfs.HdfsFileUtils;
+import com.zdjizhi.tools.http.HttpClientUtils;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.tools.functions
+ * @Description:
+ * @date 2022/11/3 10:37
+ */
+public class KnowledgeBaseSource extends RichSourceFunction<List<CustomFile>> {
+ private static final Log logger = LogFactory.get();
+ //文件系统默认标识
+ private static final String FILE_SYSTEM_TYPE = "hdfs";
+ //JSONpath 表达式
+ private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined','asn_v4','asn_v6'])].['name','sha256','format','path']";
+ //各文件sha256缓存
+ private static HashMap<String, String> knowledgeMetaCache = new HashMap<>(16);
+ //连接nacos的配置
+ private static Properties properties = new Properties();
+ //nacos Service
+ private ConfigService configService;
+ private boolean isRunning = true;
+
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ properties.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
+ properties.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_KNOWLEDGEBASE_NAMESPACE);
+ properties.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
+ properties.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
+ configService = NacosFactory.createConfigService(properties);
+ }
+
+ @Override
+ public void run(SourceContext<List<CustomFile>> ctx) {
+ try {
+ String dataId = FlowWriteConfig.NACOS_KNOWLEDGEBASE_DATA_ID;
+ String group = FlowWriteConfig.NACOS_KNOWLEDGEBASE_GROUP;
+ String configMsg = configService.getConfig(dataId, group, 5000);
+ if (StringUtil.isNotBlank(configMsg)) {
+ ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
+ if (metaList.size() >= 1) {
+ for (Object metadata : metaList) {
+ JSONObject knowledgeJson = new JSONObject(metadata, false, true);
+ String fileName = Joiner.on(FlowWriteConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
+ knowledgeJson.getStr("format"));
+ String sha256 = knowledgeJson.getStr("sha256");
+ knowledgeMetaCache.put(fileName, sha256);
+ }
+ }
+ }
+
+ configService.addListener(dataId, group, new Listener() {
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void receiveConfigInfo(String configMsg) {
+ if (StringUtil.isNotBlank(configMsg)) {
+ ArrayList<CustomFile> customFiles = new ArrayList<>();
+ ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR);
+ if (metaList.size() >= 1) {
+ for (Object metadata : metaList) {
+ JSONObject knowledgeJson = new JSONObject(metadata, false, true);
+ String fileName = Joiner.on(FlowWriteConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
+ knowledgeJson.getStr("format"));
+ String sha256 = knowledgeJson.getStr("sha256");
+ String filePath = knowledgeJson.getStr("path");
+ if (knowledgeMetaCache.containsKey(fileName)) {
+ if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
+ CustomFile customFile = loadKnowledge(fileName, filePath);
+ customFiles.add(customFile);
+ knowledgeMetaCache.put(fileName, sha256);
+
+ }
+ } else {
+ knowledgeMetaCache.put(fileName, sha256);
+ CustomFile customFile = loadKnowledge(fileName, filePath);
+ customFiles.add(customFile);
+ }
+ }
+ }
+ ctx.collect(customFiles);
+ }
+ }
+ });
+
+ while (true){
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ } catch (NacosException e) {
+ logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
+ }
+ }
+
+ /**
+ * 根据知识库返回的地址下载文件,并判断使用的文件系统类型对应更新本地文件
+ *
+ * @param fileName 文件名
+ * @param filePath 文件下载地址
+ * @return
+ */
+ private CustomFile loadKnowledge(String fileName, String filePath) {
+ InputStream inputStream = null;
+ FileOutputStream outputStream = null;
+ CustomFile customFile = new CustomFile();
+ try {
+ customFile.setFileName(fileName);
+ Header header = new BasicHeader("token", FlowWriteConfig.HOS_TOKEN);
+ HttpClientUtils httpClientUtils = new HttpClientUtils();
+ inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
+ if (FILE_SYSTEM_TYPE.equals(FlowWriteConfig.KNOWLEDGEBASE_FILE_SYSTEM_TYPE)) {
+ byte[] bytes = IOUtils.toByteArray(inputStream);
+ HdfsFileUtils.uploadFileByBytes(FlowWriteConfig.KNOWLEDGEBASE_LIBRARY + fileName, bytes);
+ customFile.setContent(bytes);
+ } else {
+ FileUtil.mkdir(FlowWriteConfig.KNOWLEDGEBASE_LIBRARY);
+ File file = new File(FlowWriteConfig.KNOWLEDGEBASE_LIBRARY.concat(File.separator).concat(fileName));
+ outputStream = new FileOutputStream(file);
+ byte[] bytes = IOUtils.toByteArray(inputStream);
+ customFile.setContent(bytes);
+ inputStream = new ByteArrayInputStream(customFile.getContent());
+ IoUtil.copy(inputStream, outputStream);
+ }
+ } catch (IOException ioException) {
+ ioException.printStackTrace();
+ } finally {
+ IOUtils.closeQuietly(inputStream);
+ IOUtils.closeQuietly(outputStream);
+ }
+ return customFile;
+ }
+
+ @Override
+ public void cancel() {
+ this.isRunning = false;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/tools/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/tools/functions/filter/FilterNull.java
index e5f8526..47d6201 100644
--- a/src/main/java/com/zdjizhi/tools/functions/FilterNullFunction.java
+++ b/src/main/java/com/zdjizhi/tools/functions/filter/FilterNull.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.tools.functions;
+package com.zdjizhi.tools.functions.filter;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.FilterFunction;
@@ -9,7 +9,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
* @Description:
* @date 2021/5/2715:01
*/
-public class FilterNullFunction implements FilterFunction<String> {
+public class FilterNull implements FilterFunction<String> {
@Override
public boolean filter(String message) {
return StringUtil.isNotBlank(message);
diff --git a/src/main/java/com/zdjizhi/tools/functions/map/MapCompleted.java b/src/main/java/com/zdjizhi/tools/functions/map/MapCompleted.java
new file mode 100644
index 0000000..0559663
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/functions/map/MapCompleted.java
@@ -0,0 +1,74 @@
+package com.zdjizhi.tools.functions.map;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.CustomFile;
+import com.zdjizhi.tools.general.IpLookup;
+import com.zdjizhi.tools.general.TransForm;
+import com.zdjizhi.tools.json.JsonParseUtil;
+import com.zdjizhi.utils.JsonMapper;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class MapCompleted extends BroadcastProcessFunction<Map<String, Object>, List<CustomFile>, String> {
+ private static final Log logger = LogFactory.get();
+
+ private static IpLookup ipLookup;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ ipLookup = new IpLookup();
+ ipLookup.loadIpLookup();
+ }
+
+ @Override
+ public void processElement(Map<String, Object> value, ReadOnlyContext readOnlyContext, Collector<String> out) {
+ try {
+ JsonParseUtil.dropJsonField(value);
+ for (String[] strings : JsonParseUtil.getJobList()) {
+ //该日志字段的值
+ Object logValue = JsonParseUtil.getValue(value, strings[0]);
+ //结果值映射到的日志字段key
+ String appendToKey = strings[1];
+ //匹配操作函数的字段
+ String function = strings[2];
+ //额外的参数的值
+ String param = strings[3];
+
+ //结果值映射到的日志字段原始value
+ Object appendToValue = JsonParseUtil.getValue(value, appendToKey);
+
+ TransForm.functionSet(function, value, appendToKey, appendToValue, logValue, param, ipLookup);
+ }
+
+ out.collect(JsonMapper.toJsonString(value));
+
+ } catch (RuntimeException e) {
+ logger.error("TransForm log failed ( The field type is not verified ),The exception is :" + e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void processBroadcastElement(List<CustomFile> customFiles, Context context, Collector<String> out) throws Exception {
+ if (!customFiles.isEmpty()) {
+ for (CustomFile customFile : customFiles) {
+ logger.info("定位库名称:" + customFile.getFileName() + ",更新文件字节长度:" + customFile.getContent().length);
+ ipLookup.updateIpLookup(customFile.getFileName(), customFile.getContent());
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/tools/functions/map/TypeMapCompleted.java b/src/main/java/com/zdjizhi/tools/functions/map/TypeMapCompleted.java
new file mode 100644
index 0000000..c10ee68
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/functions/map/TypeMapCompleted.java
@@ -0,0 +1,74 @@
+package com.zdjizhi.tools.functions.map;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.CustomFile;
+import com.zdjizhi.tools.general.IpLookup;
+import com.zdjizhi.tools.general.TransForm;
+import com.zdjizhi.tools.json.JsonParseUtil;
+import com.zdjizhi.utils.JsonMapper;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2021/5/2715:01
+ */
+public class TypeMapCompleted extends BroadcastProcessFunction<Map<String, Object>, List<CustomFile>, String> {
+ private static final Log logger = LogFactory.get();
+
+ private static IpLookup ipLookup;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ ipLookup = new IpLookup();
+ ipLookup.loadIpLookup();
+ }
+
+ @Override
+ public void processElement(Map<String, Object> value, ReadOnlyContext readOnlyContext, Collector<String> out) throws Exception {
+ try {
+ Map<String, Object> jsonMap = JsonParseUtil.typeTransform(value);
+ for (String[] strings : JsonParseUtil.getJobList()) {
+ //该日志字段的值
+ Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
+ //结果值映射到的日志字段key
+ String appendToKey = strings[1];
+ //匹配操作函数的字段
+ String function = strings[2];
+ //额外的参数的值
+ String param = strings[3];
+
+ //结果值映射到的日志字段原始value
+ Object appendToValue = JsonParseUtil.getValue(jsonMap, appendToKey);
+
+ TransForm.functionSet(function, jsonMap, appendToKey, appendToValue, logValue, param, ipLookup);
+ }
+
+ out.collect(JsonMapper.toJsonString(jsonMap));
+
+ } catch (RuntimeException e) {
+ logger.error("TransForm logs failed( The field type is verified ),The exception is :" + e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void processBroadcastElement(List<CustomFile> customFiles, Context context, Collector<String> out) throws Exception {
+ if (!customFiles.isEmpty()) {
+ for (CustomFile customFile : customFiles) {
+ logger.info("定位库名称:" + customFile.getFileName() + ",更新文件字节长度:" + customFile.getContent().length);
+ ipLookup.updateIpLookup(customFile.getFileName(), customFile.getContent());
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/tools/general/IpLookup.java b/src/main/java/com/zdjizhi/tools/general/IpLookup.java
new file mode 100644
index 0000000..ed92de0
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/general/IpLookup.java
@@ -0,0 +1,220 @@
+package com.zdjizhi.tools.general;
+
+import cn.hutool.core.io.file.FileReader;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Joiner;
+import com.maxmind.db.CHMCache;
+import com.maxmind.db.Reader;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.tools.hdfs.HdfsFileUtils;
+import com.zdjizhi.utils.IPUtil;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.io.IOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+
+public class IpLookup {
+ private static final Log logger = LogFactory.get();
+ private static final String LOCATION_SEPARATOR = ".";
+ private static final String PRIVATE_IP = "Private IP";
+ private static final String UNKNOWN = "";
+
+ private static final String[] FILES = {"ip_v4_built_in.mmdb",
+ "ip_v4_user_defined.mmdb",
+ "ip_v6_built_in.mmdb",
+ "ip_v6_user_defined.mmdb",
+ "asn_v4.mmdb",
+ "asn_v6.mmdb"};
+
+ private Reader ipLocationPublicReaderV4 = null;
+ private Reader ipLocationPublicReaderV6 = null;
+ private Reader ipLocationPrivateReaderV4 = null;
+ private Reader ipLocationPrivateReaderV6 = null;
+ private Reader asnV4Reader = null;
+ private Reader asnV6Reader = null;
+
+ /**
+ * 初始化定位库,根据文件系统类型判断从何地获取文件。
+ */
+ public void loadIpLookup() {
+ try {
+ if (FlowWriteConfig.FILE_SYSTEM_TYPE.equals(FlowWriteConfig.KNOWLEDGEBASE_FILE_SYSTEM_TYPE)) {
+ for (String name : FILES) {
+ byte[] fileBytes = HdfsFileUtils.downloadFileByBytes(FlowWriteConfig.KNOWLEDGEBASE_LIBRARY + name);
+ if (fileBytes != null) {
+ updateIpLookup(name, fileBytes);
+ }
+ }
+ } else {
+ for (String name : FILES) {
+ byte[] fileBytes = new FileReader(FlowWriteConfig.KNOWLEDGEBASE_LIBRARY + name).readBytes();
+ if (fileBytes != null) {
+ updateIpLookup(name, fileBytes);
+ }
+ }
+ }
+
+ } catch (RuntimeException e) {
+ logger.error("IpLookup init data error, please check!");
+ }
+ }
+
+ /**
+ * 更新定位库
+ *
+ * @param name 文件名
+ * @param data 数据
+ */
+ public void updateIpLookup(String name, byte[] data) {
+ InputStream inputStream = null;
+ try {
+ inputStream = new ByteArrayInputStream(data);
+ switch (name) {
+ case "ip_v4_built_in.mmdb":
+ this.ipLocationPublicReaderV4 = new Reader(inputStream, new CHMCache());
+ break;
+ case "ip_v4_user_defined.mmdb":
+ this.ipLocationPrivateReaderV4 = new Reader(inputStream, new CHMCache());
+ break;
+ case "ip_v6_built_in.mmdb":
+ this.ipLocationPublicReaderV6 = new Reader(inputStream, new CHMCache());
+ break;
+ case "ip_v6_user_defined.mmdb":
+ this.ipLocationPrivateReaderV6 = new Reader(inputStream, new CHMCache());
+ break;
+ case "asn_v4.mmdb":
+ this.asnV4Reader = new Reader(inputStream, new CHMCache());
+ break;
+ case "asn_v6.mmdb":
+ this.asnV6Reader = new Reader(inputStream, new CHMCache());
+ break;
+ }
+ } catch (IOException ioe) {
+ logger.error("The IpLookup database data conversion error, please check!");
+ } catch (RuntimeException e) {
+ logger.error("IpLookup update data error, please check!");
+ } finally {
+ IOUtils.closeQuietly(inputStream);
+ }
+ }
+
+ /**
+ * 获取ASN信息
+ *
+ * @param ip ip地址
+ * @return asn信息
+ */
+ String asnLookup(String ip) {
+ JsonNode location = getAsn(ip);
+ if (IPUtil.internalIp(ip) || StringUtil.isEmpty(location)) {
+ return UNKNOWN;
+ } else {
+ return StringUtil.setDefaultIfEmpty(location.get("ASN"), UNKNOWN).toString().replace("\"", "");
+ }
+ }
+
+ /**
+ * 获取省市县定位信息
+ *
+ * @param ip ip地址
+ * @return 返回省市县信息
+ */
+ String cityLookupDetail(String ip) {
+ if (IPUtil.internalIp(ip)) {
+ return PRIVATE_IP;
+ }
+ JsonNode location = getLocation(ip);
+ if (location == null) {
+ return UNKNOWN;
+ } else {
+ return Joiner.on(LOCATION_SEPARATOR).useForNull("").join(location.get("COUNTRY"),
+ location.get("SUPER_ADMINISTRATIVE_AREA"), location.get("ADMINISTRATIVE_AREA")).replace("\"", "");
+ }
+ }
+
+ /**
+ * 获取国家定位信息
+ *
+ * @param ip ip地址
+ * @return 返回国家信息
+ */
+ String countryLookup(String ip) {
+ if (IPUtil.internalIp(ip)) {
+ return PRIVATE_IP;
+ }
+ JsonNode location = getLocation(ip);
+ if (location == null) {
+ return UNKNOWN;
+ } else {
+ return StringUtil.setDefaultIfEmpty(location.get("COUNTRY"), UNKNOWN).toString().replace("\"", "");
+ }
+ }
+
+ /**
+ * 从地理位置定位库获取相关信息
+ *
+ * @param ip ip地址
+ * @return ip对应地理位置信息
+ */
+ private JsonNode getLocation(String ip) {
+ JsonNode jsonNode = null;
+ try {
+ InetAddress ipAddress = InetAddress.getByName(ip);
+
+ if (IPUtil.isIPAddress(ip)) {
+
+ if (IPUtil.isIPv4Address(ip)) {
+ jsonNode = ipLocationPrivateReaderV4.get(ipAddress);
+ if (StringUtil.isEmpty(jsonNode)) {
+ jsonNode = ipLocationPublicReaderV4.get(ipAddress);
+ }
+ } else if (IPUtil.isIPv6Address(ip)) {
+ jsonNode = ipLocationPrivateReaderV6.get(ipAddress);
+ if (StringUtil.isEmpty(jsonNode)) {
+ jsonNode = ipLocationPublicReaderV6.get(ipAddress);
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("ip address :" + ip + ", parser error " + e);
+ } catch (IOException e) {
+ logger.error("IP address " + ip + "of a host could not be determined, parser error " + e);
+ }
+ return jsonNode;
+ }
+
+ /**
+ * 从ASN定位库获取相关信息
+ *
+ * @param ip ip地址
+ * @return ip对应ASN信息
+ */
+ private JsonNode getAsn(String ip) {
+ JsonNode jsonNode = null;
+ try {
+ InetAddress ipAddress = InetAddress.getByName(ip);
+
+ if (IPUtil.isIPAddress(ip)) {
+
+ if (IPUtil.isIPv4Address(ip)) {
+ jsonNode = asnV4Reader.get(ipAddress);
+ } else if (IPUtil.isIPv6Address(ip)) {
+ jsonNode = asnV6Reader.get(ipAddress);
+ }
+ }
+ } catch (RuntimeException e) {
+ logger.error("IP address :" + ip + ", parser error " + e);
+ } catch (IOException e) {
+ logger.error("IP address " + ip + "of a host could not be determined, parser error " + e);
+ }
+ return jsonNode;
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/tools/general/TransFormMap.java b/src/main/java/com/zdjizhi/tools/general/TransForm.java
index 19df653..af218a8 100644
--- a/src/main/java/com/zdjizhi/tools/general/TransFormMap.java
+++ b/src/main/java/com/zdjizhi/tools/general/TransForm.java
@@ -1,9 +1,5 @@
package com.zdjizhi.tools.general;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.tools.json.JsonParseUtil;
import java.util.Map;
@@ -14,42 +10,7 @@ import java.util.Map;
*
* @author qidaijie
*/
-public class TransFormMap {
- private static final Log logger = LogFactory.get();
-
- /**
- * 解析日志,并补全
- *
- * @param jsonMap kafka Topic消费原始日志并解析
- * @return 补全后的日志
- */
- @SuppressWarnings("unchecked")
- public static String dealCommonMessage(Map<String, Object> jsonMap) {
- try {
- JsonParseUtil.dropJsonField(jsonMap);
- for (String[] strings : JsonParseUtil.getJobList()) {
- //该日志字段的值
- Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
- //结果值映射到的日志字段key
- String appendToKey = strings[1];
- //匹配操作函数的字段
- String function = strings[2];
- //额外的参数的值
- String param = strings[3];
-
- //结果值映射到的日志字段原始value
- Object appendToValue = JsonParseUtil.getValue(jsonMap, appendToKey);
-
- functionSet(function, jsonMap, appendToKey, appendToValue, logValue, param);
- }
- return JsonMapper.toJsonString(jsonMap);
- } catch (RuntimeException e) {
- logger.error("TransForm logs failed,The exception is :" + e);
- return null;
- }
- }
-
-
+public class TransForm {
/**
* 根据schema描述对应字段进行操作的 函数集合
*
@@ -60,7 +21,7 @@ public class TransFormMap {
* @param logValue 用到的参数的值
* @param param 额外的参数的值
*/
- private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKey, Object appendToValue, Object logValue, String param) {
+ public static void functionSet(String function, Map<String, Object> jsonMap, String appendToKey, Object appendToValue, Object logValue, String param,IpLookup ipLookup) {
switch (function) {
case "current_timestamp":
if (!(appendToValue instanceof Long)) {
@@ -72,17 +33,17 @@ public class TransFormMap {
break;
case "geo_ip_detail":
if (logValue != null && appendToValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpDetail(logValue.toString()));
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpDetail(logValue.toString(),ipLookup));
}
break;
case "geo_asn":
if (logValue != null && appendToValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoAsn(logValue.toString()));
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoAsn(logValue.toString(),ipLookup));
}
break;
case "geo_ip_country":
if (logValue != null && appendToValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpCountry(logValue.toString()));
+ JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpCountry(logValue.toString(),ipLookup));
}
break;
case "flattenSpec":
diff --git a/src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java
deleted file mode 100644
index 4bc2f36..0000000
--- a/src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package com.zdjizhi.tools.general;
-
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.tools.json.JsonParseUtil;
-
-import java.util.Map;
-
-
-/**
- * 描述:转换或补全工具类
- *
- * @author qidaijie
- */
-public class TransFormTypeMap {
- private static final Log logger = LogFactory.get();
-
- /**
- * 解析日志,并补全
- *
- * @param message kafka Topic原始日志
- * @return 补全后的日志
- */
- @SuppressWarnings("unchecked")
- public static String dealCommonMessage(Map<String, Object> message) {
- try {
- Map<String, Object> jsonMap = JsonParseUtil.typeTransform(message);
- for (String[] strings : JsonParseUtil.getJobList()) {
- //该日志字段的值
- Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]);
- //结果值映射到的日志字段key
- String appendToKey = strings[1];
- //匹配操作函数的字段
- String function = strings[2];
- //额外的参数的值
- String param = strings[3];
-
- //结果值映射到的日志字段原始value
- Object appendToValue = JsonParseUtil.getValue(jsonMap, appendToKey);
-
- functionSet(function, jsonMap, appendToKey, appendToValue, logValue, param);
- }
- return JsonMapper.toJsonString(jsonMap);
- } catch (RuntimeException e) {
- logger.error("TransForm logs failed,The exception is :" + e);
- e.printStackTrace();
- return null;
- }
- }
-
-
- /**
- * 根据schema描述对应字段进行操作的 函数集合
- *
- * @param function 匹配操作函数的字段
- * @param jsonMap 原始日志解析map
- * @param appendToKey 需要补全的字段的key
- * @param appendToValue 需要补全的字段的值
- * @param logValue 用到的参数的值
- * @param param 额外的参数的值
- */
- private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKey, Object appendToValue, Object logValue, String param) {
- switch (function) {
- case "current_timestamp":
- if (!(appendToValue instanceof Long)) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getCurrentTime());
- }
- break;
- case "snowflake_id":
- JsonParseUtil.setValue(jsonMap, appendToKey, SnowflakeId.generateId());
- break;
- case "geo_ip_detail":
- if (logValue != null && appendToValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpDetail(logValue.toString()));
- }
- break;
- case "geo_asn":
- if (logValue != null && appendToValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoAsn(logValue.toString()));
- }
- break;
- case "geo_ip_country":
- if (logValue != null && appendToValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpCountry(logValue.toString()));
- }
- break;
- case "flattenSpec":
- if (logValue != null && param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.flattenSpec(logValue.toString(), param));
- }
- break;
- case "if":
- if (param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.condition(jsonMap, param));
- }
- break;
- case "decode_of_base64":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param)));
- }
- break;
- case "sub_domain":
- if (appendToValue == null && logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getTopDomain(logValue.toString()));
- }
- break;
- case "radius_match":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(jsonMap, logValue.toString()));
- }
- break;
- case "gtpc_match":
- if (logValue != null) {
- TransFunction.gtpcMatch(jsonMap, logValue.toString(), appendToKey, param);
- }
- break;
- case "set_value":
- if (param != null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, param);
- }
- break;
- case "get_value":
- if (logValue != null) {
- JsonParseUtil.setValue(jsonMap, appendToKey, logValue);
- }
- break;
- default:
- }
- }
-
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/tools/general/TransFunction.java b/src/main/java/com/zdjizhi/tools/general/TransFunction.java
index 000ecbb..f5f10e5 100644
--- a/src/main/java/com/zdjizhi/tools/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/tools/general/TransFunction.java
@@ -7,7 +7,6 @@ import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.tools.hbase.HBaseUtils;
import com.zdjizhi.utils.FormatUtils;
-import com.zdjizhi.utils.IpLookupV2;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.tools.json.JsonParseUtil;
import com.zdjizhi.tools.json.JsonPathUtil;
@@ -25,18 +24,6 @@ class TransFunction {
private static final Log logger = LogFactory.get();
/**
- * IP定位库工具类
- */
- private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
- .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb")
- .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
- .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
- .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
- .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
- .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
- .build();
-
- /**
* 生成当前时间戳的操作
*/
static long getCurrentTime() {
@@ -50,7 +37,7 @@ class TransFunction {
* @param ip client IP
* @return ip地址详细信息
*/
- static String getGeoIpDetail(String ip) {
+ static String getGeoIpDetail(String ip,IpLookup ipLookup) {
String detail = "";
try {
detail = ipLookup.cityLookupDetail(ip);
@@ -68,7 +55,7 @@ class TransFunction {
* @param ip client/server IP
* @return ASN
*/
- static String getGeoAsn(String ip) {
+ static String getGeoAsn(String ip,IpLookup ipLookup) {
String asn = "";
try {
asn = ipLookup.asnLookup(ip);
@@ -86,7 +73,7 @@ class TransFunction {
* @param ip server IP
* @return 国家
*/
- static String getGeoIpCountry(String ip) {
+ static String getGeoIpCountry(String ip,IpLookup ipLookup) {
String country = "";
try {
country = ipLookup.countryLookup(ip);
@@ -98,7 +85,6 @@ class TransFunction {
return country;
}
-
/**
* radius借助HBase补齐
*
@@ -117,7 +103,7 @@ class TransFunction {
/**
* 借助HBase补齐GTP-C信息,解析tunnels信息,优先使用gtp_uplink_teid,其次使用gtp_downlink_teid
* <p>
- * "common_tunnels":[{"tunnels_schema_type":"GTP","gtp_uplink_teid":235261261,"gtp_downlink_teid":665547833,"gtp_sgw_ip":"192.56.5.2","gtp_pgw_ip":"192.56.10.20","gtp_sgw_port":2152,"gtp_pgw_port":2152}]
+ * "common_tunnels":[{"tunnels_schema_type":"GTP","gtp_endpoint_a2b_teid":235261261,"gtp_endpoint_b2a_teid":665547833,"gtp_sgw_ip":"192.56.5.2","gtp_pgw_ip":"192.56.10.20","gtp_sgw_port":2152,"gtp_pgw_port":2152}]
*
* @param jsonMap 原始日志json
* @param logValue 上行TEID
diff --git a/src/main/java/com/zdjizhi/tools/hdfs/HdfsFileUtils.java b/src/main/java/com/zdjizhi/tools/hdfs/HdfsFileUtils.java
new file mode 100644
index 0000000..6c56b77
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/hdfs/HdfsFileUtils.java
@@ -0,0 +1,78 @@
+package com.zdjizhi.tools.hdfs;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.tools.hdfs
+ * @Description:
+ * @date 2022/11/217:57
+ */
+public class HdfsFileUtils {
+ private static final Log logger = LogFactory.get();
+
+
+ private static HdfsFileUtils hdfsFileUtils;
+ private static FileSystem fileSystem;
+
+ static {
+ if (FlowWriteConfig.FILE_SYSTEM_TYPE.equals(FlowWriteConfig.KNOWLEDGEBASE_FILE_SYSTEM_TYPE)) {
+ Configuration configuration = new Configuration();
+ try {
+ //创建fileSystem,用于连接hdfs
+ fileSystem = FileSystem.get(new URI(FlowWriteConfig.HDFS_URI), configuration);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * 下载HDFS文件
+ *
+ * @param filePath 文件路径
+ * @return 文件
+ * @throws IOException
+ */
+ public static byte[] downloadFileByBytes(String filePath) {
+ try (FSDataInputStream open = fileSystem.open(new Path(filePath))) {
+ byte[] bytes = new byte[open.available()];
+ open.read(0, bytes, 0, open.available());
+ return bytes;
+ } catch (IOException e) {
+ logger.error("An I/O exception when files are download from HDFS. Message is :" + e.getMessage());
+ }
+ return null;
+ }
+
+ /**
+ * 更新文件到HDFS
+ *
+ * @param filePath 文件路径
+ * @param bytes 文件
+ * @throws IOException
+ */
+ public static void uploadFileByBytes(String filePath, byte[] bytes) {
+ try (FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath), true)) {
+ fsDataOutputStream.write(bytes);
+ } catch (RuntimeException e) {
+ logger.error("Uploading files to the HDFS is abnormal. Message is :" + e.getMessage());
+ } catch (IOException e) {
+ logger.error("An I/O exception when files are uploaded to HDFS. Message is :" + e.getMessage());
+ }
+ }
+}
+
diff --git a/src/main/java/com/zdjizhi/tools/http/HttpClientUtils.java b/src/main/java/com/zdjizhi/tools/http/HttpClientUtils.java
new file mode 100644
index 0000000..4c4f0ea
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/http/HttpClientUtils.java
@@ -0,0 +1,320 @@
+package com.zdjizhi.tools.http;
+
+
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.*;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpRequestRetryHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
+import org.apache.http.conn.HttpHostConnectException;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.message.BasicHeaderElementIterator;
+import org.apache.http.protocol.HTTP;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+
+public class HttpClientUtils {
+ /**
+ * 全局连接池对象
+ */
+ private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager();
+
+ private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
+ public static final String ERROR_MESSAGE = "-1";
+
+ /*
+ * 静态代码块配置连接池信息
+ */
+ static {
+ // 设置最大连接数
+ CONN_MANAGER.setMaxTotal(FlowWriteConfig.HTTP_POOL_MAX_CONNECTION);
+ // 设置每个连接的路由数
+ CONN_MANAGER.setDefaultMaxPerRoute(FlowWriteConfig.HTTP_POOL_MAX_PER_ROUTE);
+
+ }
+
+ /**
+ * 获取Http客户端连接对象
+ *
+ * @return Http客户端连接对象
+ */
+ private static CloseableHttpClient getHttpClient() {
+ // 创建Http请求配置参数
+ RequestConfig requestConfig = RequestConfig.custom()
+ // 获取连接超时时间
+ .setConnectionRequestTimeout(FlowWriteConfig.HTTP_POOL_REQUEST_TIMEOUT)
+ // 请求超时时间
+ .setConnectTimeout(FlowWriteConfig.HTTP_POOL_CONNECT_TIMEOUT)
+ // 响应超时时间
+ .setSocketTimeout(FlowWriteConfig.HTTP_POOL_RESPONSE_TIMEOUT)
+ .build();
+
+ /*
+ * 测出超时重试机制为了防止超时不生效而设置
+ * 如果直接放回false,不重试
+ * 这里会根据情况进行判断是否重试
+ */
+ HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
+ if (executionCount >= 3) {// 如果已经重试了3次,就放弃
+ return false;
+ }
+ if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
+ return true;
+ }
+ if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
+ return false;
+ }
+ if (exception instanceof UnknownHostException) {// 目标服务器不可达
+ return false;
+ }
+ if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
+ return false;
+ }
+ if (exception instanceof HttpHostConnectException) {// 连接被拒绝
+ return false;
+ }
+ if (exception instanceof SSLException) {// ssl握手异常
+ return false;
+ }
+ if (exception instanceof InterruptedIOException) {// 超时
+ return true;
+ }
+ HttpClientContext clientContext = HttpClientContext.adapt(context);
+ HttpRequest request = clientContext.getRequest();
+ // 如果请求是幂等的,就再次尝试
+ return !(request instanceof HttpEntityEnclosingRequest);
+ };
+
+
+ ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
+ HeaderElementIterator it = new BasicHeaderElementIterator
+ (response.headerIterator(HTTP.CONN_KEEP_ALIVE));
+ while (it.hasNext()) {
+ HeaderElement he = it.nextElement();
+ String param = he.getName();
+ String value = he.getValue();
+ if (value != null && "timeout".equalsIgnoreCase(param)) {
+ return Long.parseLong(value) * 1000;
+ }
+ }
+ return 60 * 1000;//如果没有约定,则默认定义时长为60s
+ };
+
+ // 创建httpClient
+ return HttpClients.custom()
+ // 把请求相关的超时信息设置到连接客户端
+ .setDefaultRequestConfig(requestConfig)
+ // 把请求重试设置到连接客户端
+ .setRetryHandler(retry)
+ .setKeepAliveStrategy(myStrategy)
+ // 配置连接池管理对象
+ .setConnectionManager(CONN_MANAGER)
+ .build();
+ }
+
+
+ /**
+ * GET请求
+ *
+ * @param uri 请求地
+ * @return message
+ */
+ public static String httpGet(URI uri, Header... headers) {
+ String msg = ERROR_MESSAGE;
+
+ // 获取客户端连接对象
+ CloseableHttpClient httpClient = getHttpClient();
+ CloseableHttpResponse response = null;
+
+ try {
+ logger.info("http get uri {}", uri);
+ // 创建GET请求对象
+ HttpGet httpGet = new HttpGet(uri);
+
+ if (StringUtil.isNotEmpty(headers)) {
+ for (Header h : headers) {
+ httpGet.addHeader(h);
+ logger.info("request header : {}", h);
+ }
+ }
+ // 执行请求
+ response = httpClient.execute(httpGet);
+ int statusCode = response.getStatusLine().getStatusCode();
+ // 获取响应实体
+ HttpEntity entity = response.getEntity();
+ // 获取响应信息
+ msg = EntityUtils.toString(entity, "UTF-8");
+
+ if (statusCode != HttpStatus.SC_OK) {
+ logger.error("Http get content is :{}", msg);
+ }
+
+ } catch (ClientProtocolException e) {
+ logger.error("协议错误: {}", e.getMessage());
+ } catch (ParseException e) {
+ logger.error("解析错误: {}", e.getMessage());
+ } catch (IOException e) {
+ logger.error("IO错误: {}", e.getMessage());
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consume(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ logger.error("释放链接错误: {}", e.getMessage());
+
+ }
+ }
+ }
+
+ return msg;
+ }
+
+ /**
+ * POST 请求
+ *
+ * @param uri uri参数
+ * @param requestBody 请求体
+ * @return post请求返回结果
+ */
+ public static String httpPost(URI uri, String requestBody, Header... headers) {
+ String msg = ERROR_MESSAGE;
+ // 获取客户端连接对象
+ CloseableHttpClient httpClient = getHttpClient();
+
+ // 创建POST请求对象
+ CloseableHttpResponse response = null;
+ try {
+
+ logger.info("http post uri:{}, http post body:{}", uri, requestBody);
+
+ HttpPost httpPost = new HttpPost(uri);
+ httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
+ if (StringUtil.isNotEmpty(headers)) {
+ for (Header h : headers) {
+ httpPost.addHeader(h);
+ logger.info("request header : {}", h);
+ }
+ }
+
+ if (StringUtil.isNotBlank(requestBody)) {
+ byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8);
+ httpPost.setEntity(new ByteArrayEntity(bytes));
+ }
+
+ response = httpClient.execute(httpPost);
+ int statusCode = response.getStatusLine().getStatusCode();
+ // 获取响应实体
+ HttpEntity entity = response.getEntity();
+ // 获取响应信息
+ msg = EntityUtils.toString(entity, "UTF-8");
+
+ if (statusCode != HttpStatus.SC_OK) {
+ logger.error("Http post content is :{}", msg);
+ }
+ } catch (ClientProtocolException e) {
+ logger.error("协议错误: {}", e.getMessage());
+ } catch (ParseException e) {
+ logger.error("解析错误: {}", e.getMessage());
+ } catch (IOException e) {
+ logger.error("IO错误: {}", e.getMessage());
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ logger.error("释放链接错误: {}", e.getMessage());
+
+ }
+ }
+ }
+ return msg;
+ }
+
+ /**
+ * 拼装url
+ * url ,参数map
+ */
+ public static void setUrlWithParams(URIBuilder uriBuilder, String path, Map<String, Object> params) {
+ try {
+ uriBuilder.setPath(path);
+ if (params != null && !params.isEmpty()) {
+ for (Map.Entry<String, Object> kv : params.entrySet()) {
+ uriBuilder.setParameter(kv.getKey(), kv.getValue().toString());
+ }
+ }
+ } catch (Exception e) {
+ logger.error("拼接url出错,uri : {}, path : {},参数: {}", uriBuilder.toString(), path, params);
+ }
+ }
+
+
+ public InputStream httpGetInputStream(String url, int socketTimeout, Header... headers) {
+ InputStream result = null;
+ // 获取客户端连接对象
+ CloseableHttpClient httpClient = getHttpClient();
+ // 创建GET请求对象
+ HttpGet httpGet = new HttpGet(url);
+ if (StringUtil.isNotEmpty(headers)) {
+ for (Header h : headers) {
+ httpGet.addHeader(h);
+ }
+ }
+ CloseableHttpResponse response = null;
+
+ try {
+ // 执行请求
+ response = httpClient.execute(httpGet);
+ // 获取响应实体
+ result = IOUtils.toBufferedInputStream(response.getEntity().getContent());
+ // 获取响应信息
+ EntityUtils.consume(response.getEntity());
+ } catch (ClientProtocolException e) {
+ logger.error("current file: {},Protocol error:{}", url, e.getMessage());
+
+ } catch (ParseException e) {
+ logger.error("current file: {}, Parser error:{}", url, e.getMessage());
+
+ } catch (IOException e) {
+ logger.error("current file: {},IO error:{}", url, e.getMessage());
+
+ } finally {
+ if (null != response) {
+ try {
+ EntityUtils.consume(response.getEntity());
+ response.close();
+ } catch (IOException e) {
+ logger.error("Release Connection error:{}", e.getMessage());
+
+ }
+ }
+ }
+ return result;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java
index 59365be..002b117 100644
--- a/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java
+++ b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java
@@ -9,7 +9,6 @@ import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
-import com.jayway.jsonpath.JsonPath;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
@@ -55,8 +54,8 @@ public class JsonParseUtil {
propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
try {
ConfigService configService = NacosFactory.createConfigService(propNacos);
- String dataId = FlowWriteConfig.NACOS_DATA_ID;
- String group = FlowWriteConfig.NACOS_GROUP;
+ String dataId = FlowWriteConfig.NACOS_SCHEMA_DATA_ID;
+ String group = FlowWriteConfig.NACOS_SCHEMA_GROUP;
String schema = configService.getConfig(dataId, group, 5000);
if (StringUtil.isNotBlank(schema)) {
jsonFieldsMap = getFieldsFromSchema(schema);
@@ -263,7 +262,6 @@ public class JsonParseUtil {
dropList.add(filedStr);
}
}
- System.out.println(map.toString());
return map;
}
diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
index ec75b8a..3933ee8 100644
--- a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
@@ -34,7 +34,7 @@ public class KafkaConsumer {
* @return kafka logs -> map
*/
@SuppressWarnings("unchecked")
- public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer() {
+ public static FlinkKafkaConsumer<Map<String, Object>> timestampDeserializationConsumer() {
FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
new TimestampDeserializationSchema(), createConsumerConfig());
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index 5581137..685cc0a 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -2,17 +2,21 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.CustomFile;
import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.tools.functions.FilterNullFunction;
-import com.zdjizhi.tools.functions.MapCompletedFunction;
-import com.zdjizhi.tools.functions.TypeMapCompletedFunction;
+import com.zdjizhi.tools.functions.map.MapCompleted;
+import com.zdjizhi.tools.functions.map.TypeMapCompleted;
+import com.zdjizhi.tools.functions.broadcast.KnowledgeBaseSource;
+import com.zdjizhi.tools.functions.filter.FilterNull;
import com.zdjizhi.tools.kafka.KafkaConsumer;
import com.zdjizhi.tools.kafka.KafkaProducer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import java.util.List;
import java.util.Map;
/**
@@ -29,43 +33,56 @@ public class LogFlowWriteTopology {
//两个输出之间的最大时间 (单位milliseconds)
environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
-
if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
- SingleOutputStreamOperator<Map<String, Object>> streamSource = environment.addSource(KafkaConsumer.myDeserializationConsumer())
- .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM).name(FlowWriteConfig.SOURCE_KAFKA_TOPIC);
+ //广播流,只开一个并行度
+ DataStreamSource<List<CustomFile>> customFileDataStreamSource = environment.addSource(new KnowledgeBaseSource())
+ .setParallelism(1);
+
+ //用于存储广播数据的state
+ MapStateDescriptor<String, List<CustomFile>> descriptor = new MapStateDescriptor<>("descriptor", Types.STRING, Types.LIST(TypeInformation.of(CustomFile.class)));
+
+ //将该流广播赋给缓存
+ BroadcastStream<List<CustomFile>> broadcast = customFileDataStreamSource.broadcast(descriptor);
- DataStream<String> cleaningLog;
+ //业务流连接广播流
+ BroadcastConnectedStream<Map<String, Object>, List<CustomFile>> connect = environment.addSource(KafkaConsumer.timestampDeserializationConsumer())
+ .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM)
+ .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC)
+ .connect(broadcast);
+
+ //日志清洗
+ SingleOutputStreamOperator<String> completed;
switch (FlowWriteConfig.LOG_TRANSFORM_TYPE) {
case 0:
//对原始日志进行处理补全转换等,不对日志字段类型做校验。
- cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
+ completed = connect.process(new MapCompleted())
+ .name("MapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
case 1:
//对原始日志进行处理补全转换等,对日志字段类型做若校验,可根据schema进行强转。
- cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction")
+ completed = connect.process(new TypeMapCompleted())
+ .name("TypeMapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
break;
default:
- //对原始日志进行处理补全转换等,不对日志字段类型做校验。
- cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction")
+ //对原始日志进行处理补全转换等,对日志字段类型做若校验,可根据schema进行强转。
+ completed = connect.process(new TypeMapCompleted())
+ .name("TypeMapCompletedFunction")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
}
- //过滤空数据不发送到Kafka内
- DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData")
- .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
-
//发送数据到Kafka
- result.addSink(KafkaProducer.getKafkaProducer()).name(FlowWriteConfig.SINK_KAFKA_TOPIC)
+ completed.addSink(KafkaProducer.getKafkaProducer()).name(FlowWriteConfig.SINK_KAFKA_TOPIC)
.setParallelism(FlowWriteConfig.SINK_PARALLELISM);
} else {
+ //不进行任何处理,仅发送到kafka内。
DataStreamSource<String> streamSource = environment.addSource(KafkaConsumer.flinkConsumer())
.setParallelism(FlowWriteConfig.SOURCE_PARALLELISM);
//过滤空数据不发送到Kafka内
- DataStream<String> result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData")
+ DataStream<String> result = streamSource.filter(new FilterNull()).name("FilterOriginalData")
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//发送数据到Kafka
diff --git a/src/main/logback.xml b/src/main/logback.xml
deleted file mode 100644
index 59095f6..0000000
--- a/src/main/logback.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<configuration>
-
- <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符-->
- <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
- <!-- 定义日志存储的路径,不要配置相对路径 -->
- <property name="LOG_FILE_PATH" value="E:/logs/demo.%d{yyyy-MM-dd}.%i.log" />
-
- <!-- 控制台输出日志 -->
- <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <!-- 按照上面配置的LOG_PATTERN来打印日志 -->
- <pattern>${LOG_PATTERN}</pattern>
- </encoder>
- </appender>
-
- <!--每天生成一个日志文件,保存30天的日志文件。rollingFile是用来切分文件的 -->
- <appender name="FILE"
- class="ch.qos.logback.core.rolling.RollingFileAppender">
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_FILE_PATH}</fileNamePattern>
- <!-- keep 15 days' worth of history -->
- <maxHistory>30</maxHistory>
- <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
- <!-- 日志文件的最大大小 -->
- <maxFileSize>20MB</maxFileSize>
- </timeBasedFileNamingAndTriggeringPolicy>
- </rollingPolicy>
-
- <encoder>
- <pattern>${LOG_PATTERN}</pattern>
- </encoder>
- </appender>
- <!-- project default level项目输出的日志级别 -->
- <logger name="com.example.demo" level="INFO" />
-
- <!-- 日志输出级别 常用的日志级别按照从高到低依次为:ERROR、WARN、INFO、DEBUG。 -->
- <root level="INFO">
- <appender-ref ref="CONSOLE" />
- <appender-ref ref="FILE" /><!--对应appender name="FILE"。 -->
- </root>
-</configuration> \ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/EncryptorTest.java b/src/test/java/com/zdjizhi/EncryptorTest.java
deleted file mode 100644
index 9bd8e71..0000000
--- a/src/test/java/com/zdjizhi/EncryptorTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.zdjizhi;
-
-import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
-import org.junit.Test;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2022/3/1610:55
- */
-public class EncryptorTest {
-
-
- @Test
- public void passwordTest(){
- StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
- // 配置加密解密的密码/salt值
- encryptor.setPassword("galaxy");
- // 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p
- String pin = "galaxy2019";
- String encPin = encryptor.encrypt(pin);
- String user = "admin";
- String encUser = encryptor.encrypt(user);
- System.out.println(encPin);
- System.out.println(encUser);
- // 再进行解密:raw_password
- String rawPwd = encryptor.decrypt("ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)");
- String rawUser = encryptor.decrypt("ENC(nnasyGpHKGFA4KW0zro9MDdw==)");
-
- System.out.println("The username is: "+rawPwd);
- System.out.println("The pin is: "+rawUser);
- }
-
-}
diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java
deleted file mode 100644
index c667224..0000000
--- a/src/test/java/com/zdjizhi/FunctionTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.zdjizhi;
-
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.IpLookupV2;
-import com.zdjizhi.utils.general.CityHash;
-import org.junit.Test;
-
-import java.math.BigInteger;
-import java.util.Calendar;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2021/11/611:38
- */
-public class FunctionTest {
-
- private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
- .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb")
-// .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb")
-// .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb")
-// .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb")
- .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
- .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
- .build();
-
- @Test
- public void CityHashTest() {
-
- byte[] dataBytes = String.valueOf(613970406986188816L).getBytes();
- long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length);
- String decimalValue = Long.toUnsignedString(hashValue, 10);
- BigInteger result = new BigInteger(decimalValue);
- System.out.println(result);
- }
-
- @Test
- public void ipLookupTest() {
- String ip = "0.255.255.254";
- System.out.println(ipLookup.cityLookupDetail(ip));
- System.out.println(ipLookup.countryLookup(ip));
- }
-
- @Test
- public void timestampTest(){
- Calendar cal = Calendar.getInstance();
- Long utcTime=cal.getTimeInMillis();
- System.out.println(utcTime);
- System.out.println(System.currentTimeMillis());
- }
-}
diff --git a/src/test/java/com/zdjizhi/HBaseTest.java b/src/test/java/com/zdjizhi/HBaseTest.java
deleted file mode 100644
index 5f94e32..0000000
--- a/src/test/java/com/zdjizhi/HBaseTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.zdjizhi;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2021/12/310:42
- */
-public class HBaseTest {
-
- @Test
- public void getColumn() {
- // 管理Hbase的配置信息
- Configuration configuration = HBaseConfiguration.create();
- // 设置zookeeper节点
- configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181");
- configuration.set("hbase.client.retries.number", "3");
- configuration.set("hbase.bulkload.retries.number", "3");
- configuration.set("zookeeper.recovery.retry", "3");
- try {
- Connection connection = ConnectionFactory.createConnection(configuration);
- Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account"));
- Scan scan2 = new Scan();
- ResultScanner scanner = table.getScanner(scan2);
- for (Result result : scanner) {
- int acctStatusType;
- boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
- if (hasType) {
- acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
- } else {
- acctStatusType = 3;
- }
- String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
- String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
- System.out.println("status" + acctStatusType + "key:" + framedIp + "value:" + account);
-// System.out.println(Arrays.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))));
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-}
diff --git a/src/test/java/com/zdjizhi/function/Base64Test.java b/src/test/java/com/zdjizhi/function/Base64Test.java
new file mode 100644
index 0000000..891b5af
--- /dev/null
+++ b/src/test/java/com/zdjizhi/function/Base64Test.java
@@ -0,0 +1,77 @@
+package com.zdjizhi.function;
+
+import cn.hutool.core.codec.Base64;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.utils.StringUtil;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.function
+ * @Description:
+ * @date 2022/11/39:36
+ */
+public class Base64Test {
+ private static final Log logger = LogFactory.get();
+
+ /**
+ * 根据编码解码base64(hutool)
+
+ * @return 解码字符串
+ */
+ @Test
+ public void decodeBase64Hutool() {
+ try {
+ System.out.println(Base64.decodeStr("bWFpbF90ZXN0X2VuZ2xpc2gudHh0"));
+ System.out.println(Base64.decodeStr("aGVsbG8="));
+ } catch (RuntimeException e) {
+ logger.error("Resolve Base64 exception, exception information:" + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+
+ /**
+ * 根据编码解码base64
+
+ * @return 解码字符串
+ */
+ @Test
+ public void encodeBase64() {
+ try {
+ System.out.println(java.util.Base64.getUrlEncoder().encodeToString("runoob?java8".getBytes("ISO-8859-1")));
+ System.out.println(java.util.Base64.getUrlEncoder().encodeToString("runoob?java8".getBytes("utf-8")));
+
+ } catch (RuntimeException e) {
+ logger.error("Resolve Base64 exception, exception information:" + e.getMessage());
+ e.printStackTrace();
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 根据编码解码base64
+
+ * @return 解码字符串
+ */
+ @Test
+ public void decodeBase64() {
+ try {
+ byte[] base64decodedBytes = java.util.Base64.getDecoder().decode("bWFpbF90ZXN0X2VuZ2xpc2gudHh0");
+
+ System.out.println("原始字符串: " + new String(base64decodedBytes, "utf-8"));
+ System.out.println("原始字符串: " + new String(base64decodedBytes));
+
+ } catch (RuntimeException e) {
+ logger.error("Resolve Base64 exception, exception information:" + e.getMessage());
+ e.printStackTrace();
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/test/java/com/zdjizhi/function/EncryptorTest.java b/src/test/java/com/zdjizhi/function/EncryptorTest.java
new file mode 100644
index 0000000..d00a62f
--- /dev/null
+++ b/src/test/java/com/zdjizhi/function/EncryptorTest.java
@@ -0,0 +1,38 @@
+package com.zdjizhi.function;
+
+import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
+import org.junit.Test;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2022/3/1610:55
+ */
+public class EncryptorTest {
+
+
+ @Test
+ public void passwordTest(){
+ StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
+ // 配置加密解密的密码/salt值
+ encryptor.setPassword("galaxy");
+ // 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p
+ String kafkaUser = encryptor.encrypt("admin");
+ String kafkaPin = encryptor.encrypt("galaxy2019");
+ String nacosPin = encryptor.encrypt("nacos");
+ String nacosUser = encryptor.encrypt("nacos");
+
+ System.out.println("Kafka:\n"+"The username is: "+kafkaUser);
+ System.out.println("The pin is: "+kafkaPin);
+ System.out.println("Nacos:\n"+"The username is: "+nacosUser);
+ System.out.println("The pin is: "+nacosPin);
+ // 再进行解密:raw_password
+ System.out.println("Kafka:\n"+"The username is: "+encryptor.decrypt(kafkaUser));
+ System.out.println("The pin is: "+encryptor.decrypt(kafkaPin));
+
+ System.out.println("Nacos:\n"+"The username is: "+encryptor.decrypt(nacosUser));
+ System.out.println("The pin is: "+encryptor.decrypt(nacosPin));
+ }
+
+}
diff --git a/src/test/java/com/zdjizhi/function/HBaseTest.java b/src/test/java/com/zdjizhi/function/HBaseTest.java
new file mode 100644
index 0000000..bfdcdf9
--- /dev/null
+++ b/src/test/java/com/zdjizhi/function/HBaseTest.java
@@ -0,0 +1,245 @@
+package com.zdjizhi.function;
+
+import cn.hutool.json.JSONObject;
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.tools.hbase.HBaseUtils;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import scala.Int;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2021/12/310:42
+ */
+public class HBaseTest {
+ private static final Log logger = LogFactory.get();
+ private static Map<String, String> radiusMap = new ConcurrentHashMap<>(16);
+
+ private static Map<String,HashMap<String, Object>> gtpcMap = new ConcurrentHashMap<>(16);
+
+ @Test
+ public void getColumn() {
+ // 管理Hbase的配置信息
+ Configuration configuration = HBaseConfiguration.create();
+ // 设置zookeeper节点
+ configuration.set("hbase.zookeeper.quorum", "192.168.44.111:2181");
+ configuration.set("hbase.client.retries.number", "1");
+ configuration.set("hbase.client.pause", "50");
+ configuration.set("hbase.rpc.timeout", "3000");
+ configuration.set("zookeeper.recovery.retry", "1");
+ configuration.set("zookeeper.recovery.retry.intervalmill", "200");
+ try {
+ System.out.println(System.currentTimeMillis());
+ Connection connection = ConnectionFactory.createConnection(configuration);
+ Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account"));
+ Scan scan2 = new Scan();
+ ResultScanner scanner = table.getScanner(scan2);
+ for (Result result : scanner) {
+ int acctStatusType;
+ boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
+ if (hasType) {
+ acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
+ } else {
+ acctStatusType = 3;
+ }
+ String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
+ String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
+ System.out.println("status" + acctStatusType + "key:" + framedIp + "value:" + account);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }finally {
+ System.out.println(System.currentTimeMillis());
+ }
+ }
+
+
+ @Test
+ public void getGtpcData() {
+ // 管理Hbase的配置信息
+ Configuration configuration = HBaseConfiguration.create();
+ // 设置zookeeper节点
+ configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181");
+ configuration.set("hbase.client.retries.number", "1");
+ configuration.set("hbase.client.pause", "50");
+ configuration.set("hbase.rpc.timeout", "3000");
+ configuration.set("zookeeper.recovery.retry", "1");
+ configuration.set("zookeeper.recovery.retry.intervalmill", "200");
+ long begin = System.currentTimeMillis();
+ ResultScanner scanner = null;
+ try {
+ Connection connection = ConnectionFactory.createConnection(configuration);
+ Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME));
+ Scan scan2 = new Scan();
+ scanner = table.getScanner(scan2);
+ for (Result result : scanner) {
+ String upLinkTeid = getTeid(result, "uplink_teid");
+ String downLinkTeid = getTeid(result, "downlink_teid");
+ String phoneNumber = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim();
+ String imsi = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim();
+ String imei = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim();
+ Long lastUpdateTime = getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time");
+
+ HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime);
+
+ if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) {
+ String vsysId = getVsysId(result).trim();
+ updateCache(gtpcMap, upLinkTeid+vsysId, buildUserData, lastUpdateTime);
+ updateCache(gtpcMap, downLinkTeid+vsysId, buildUserData, lastUpdateTime);
+ } else {
+ updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime);
+ updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime);
+ }
+ }
+ logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size());
+ logger.warn("The time spent to obtain GTP-C relationships : " + (System.currentTimeMillis() - begin));
+ } catch (IOException | RuntimeException e) {
+ logger.error("The relationship between USER and TEID obtained from HBase is abnormal! message is :" + e);
+ e.printStackTrace();
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+
+ for (String key : gtpcMap.keySet()){
+ System.out.println(key +"---"+gtpcMap.get(key));
+ }
+ }
+
+ /**
+ * 获取HBase内String类型的值
+ *
+ * @param result 结果集
+ * @param familyName 列族名称
+ * @param columnName 列名称
+ * @return 结果数据
+ */
+ private static String getString(Result result, String familyName, String columnName) {
+ byte[] familyBytes = Bytes.toBytes(familyName);
+ byte[] columnBytes = Bytes.toBytes(columnName);
+ boolean contains = result.containsColumn(familyBytes, columnBytes);
+ if (contains) {
+ String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim();
+ if (StringUtil.isNotBlank(data)) {
+ return data;
+ }
+ }
+
+ return "";
+ }
+
+ /**
+ * 获取HBase内String类型的值
+ *
+ * @param result 结果集
+ * @param columnName 列名称
+ * @return 结果数据
+ */
+ private static Long getLong(Result result, String familyName, String columnName) {
+ byte[] familyBytes = Bytes.toBytes(familyName);
+ byte[] columnBytes = Bytes.toBytes(columnName);
+ boolean contains = result.containsColumn(familyBytes, columnBytes);
+ if (contains) {
+ return Bytes.toLong(result.getValue(familyBytes, columnBytes));
+ }
+ return 0L;
+ }
+
+ /**
+ * 获取HBase内String类型的值
+ *
+ * @param result 结果集
+ * @param columnName 列名称
+ * @return 结果数据
+ */
+ private static String getTeid(Result result, String columnName) {
+ byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME);
+ byte[] columnBytes = Bytes.toBytes(columnName);
+ boolean contains = result.containsColumn(familyBytes, columnBytes);
+ if (contains) {
+ String data = String.valueOf(Bytes.toLong(result.getValue(familyBytes, columnBytes))).trim();
+ if (StringUtil.isNotBlank(data)) {
+ return data;
+ }
+ }
+ return "0";
+ }
+
+ /**
+ * 构建用户信息
+ *
+ * @param phoneNumber 手机号
+ * @param imsi 用户标识
+ * @param imei 设备标识
+ * @return 用户信息
+ */
+ private static HashMap<String, Object> buildUserData(String phoneNumber, String imsi, String imei, Long lastUpdateTime) {
+ HashMap<String, Object> tmpMap = new HashMap<>(4);
+ tmpMap.put("common_phone_number", phoneNumber);
+ tmpMap.put("common_imsi", imsi);
+ tmpMap.put("common_imei", imei);
+ tmpMap.put("last_update_time", lastUpdateTime);
+ return tmpMap;
+ }
+
+
+ /**
+ * 获取HBase内String类型的值
+ *
+ * @param result 结果集
+ * @return 结果数据
+ */
+ static String getVsysId(Result result) {
+ byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.COMMON_FAMILY_NAME);
+ byte[] columnBytes = Bytes.toBytes("vsys_id");
+ boolean contains = result.containsColumn(familyBytes, columnBytes);
+ if (contains) {
+ String data = String.valueOf(Bytes.toInt(result.getValue(familyBytes, columnBytes))).trim();
+ if (StringUtil.isNotBlank(data)) {
+ return data;
+ }
+ }
+ return "1";
+ }
+
+ /**
+ * 判断缓存与新获取的数据时间戳大小,若大于缓存内记录的时间戳;则更新缓存
+ *
+ * @param gtpcMap 缓存集合
+ * @param key 上下行teid
+ * @param userData 获取HBase内的用户信息
+ * @param lastUpdateTime 该用户信息最后更新时间
+ */
+ private static void updateCache(Map<String, HashMap<String, Object>> gtpcMap, String key, HashMap<String, Object> userData, Long lastUpdateTime) {
+ if (StringUtil.isNotBlank(key)){
+ if (gtpcMap.containsKey(key)) {
+ Long oldUpdateTime = Long.parseLong(gtpcMap.get(key).get("last_update_time").toString());
+ if (lastUpdateTime > oldUpdateTime) {
+ gtpcMap.put(key, userData);
+ }
+ } else {
+ gtpcMap.put(key, userData);
+ }
+ }
+ }
+
+
+}
diff --git a/src/test/java/com/zdjizhi/hdfs/FileUtilsTest.java b/src/test/java/com/zdjizhi/hdfs/FileUtilsTest.java
new file mode 100644
index 0000000..24f8696
--- /dev/null
+++ b/src/test/java/com/zdjizhi/hdfs/FileUtilsTest.java
@@ -0,0 +1,68 @@
+package com.zdjizhi.hdfs;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.tools.hdfs
+ * @Description:
+ * @date 2022/11/217:57
+ */
+public class FileUtilsTest {
+ private static final Log logger = LogFactory.get();
+
+ private static FileSystem fileSystem;
+
+ static {
+ Configuration configuration = new Configuration();
+ try {
+ //创建fileSystem,用于连接hdfs
+ fileSystem = FileSystem.get(new URI(FlowWriteConfig.HDFS_URI),configuration);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void mkdir() throws Exception{
+ fileSystem.mkdirs(new Path("/IPlocation/ETL-SESSION-RECORD-COMPLETED"));
+ }
+
+ @Test
+ public void create() throws Exception{
+ FSDataOutputStream outputStream = fileSystem.create(new Path("/qitest/test/test.txt"));
+ outputStream.write("Hello World".getBytes());
+ outputStream.flush();
+ outputStream.close();
+ }
+
+ @Test
+ public void cat() throws Exception{
+ FSDataInputStream inputStream = fileSystem.open(new Path("/qitest/test/test.txt"));
+ IOUtils.copyBytes(inputStream, System.out, 1024);
+ inputStream.close();
+ }
+
+ @Test
+ public void rename() throws Exception{
+ fileSystem.rename(new Path("/qitest"), new Path("/IPlocation"));
+ }
+
+ @Test
+ public void delete() throws Exception{
+ fileSystem.delete(new Path("/qitest/test2"),true);//是否递归删除
+ }
+}
+
diff --git a/src/test/java/com/zdjizhi/hos/hosUtilsTest.java b/src/test/java/com/zdjizhi/hos/hosUtilsTest.java
new file mode 100644
index 0000000..27f4c60
--- /dev/null
+++ b/src/test/java/com/zdjizhi/hos/hosUtilsTest.java
@@ -0,0 +1,101 @@
+package com.zdjizhi.hos;
+
+import cn.hutool.core.io.IoUtil;
+import cn.hutool.http.HttpUtil;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Joiner;
+import com.maxmind.db.CHMCache;
+import com.maxmind.db.Reader;
+import com.zdjizhi.common.CustomFile;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.tools.http.HttpClientUtils;
+import com.zdjizhi.utils.IpLookupV2;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.hos
+ * @Description:
+ * @date 2022/11/713:55
+ */
+public class hosUtilsTest {
+ @Test
+ public void downloadToLocalTest() {
+ FileOutputStream outputStream = null;
+ InputStream inputStream = null;
+ try {
+ HttpClientUtils httpClientUtils = new HttpClientUtils();
+ Header header = new BasicHeader("token", FlowWriteConfig.HOS_TOKEN);
+ String url = "http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/a69c41bc-c673-46da-a810-f29aa18a3a16-aXBfdjRfYnVpbHRfaW4=.mmdb";
+ inputStream = httpClientUtils.httpGetInputStream(url, 3000, header);
+ File file = new File(FlowWriteConfig.TOOLS_LIBRARY.concat(File.separator).concat("ip_v4_built_in.mmdb"));
+ if (!file.getParentFile().exists()) {
+ file.getParentFile().mkdir();
+ }
+ outputStream = new FileOutputStream(file);
+ IoUtil.copy(inputStream, outputStream);
+ } catch (IOException | RuntimeException e) {
+ e.printStackTrace();
+ } finally {
+ IoUtil.close(inputStream);
+ IoUtil.close(outputStream);
+ }
+ }
+
+
+ @Test
+ public void locationTest() {
+ try {
+ CustomFile customFile = new CustomFile();
+ HttpClientUtils httpClientUtils = new HttpClientUtils();
+ Header header = new BasicHeader("token", FlowWriteConfig.HOS_TOKEN);
+ String url = "http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/a69c41bc-c673-46da-a810-f29aa18a3a16-aXBfdjRfYnVpbHRfaW4=.mmdb";
+ InputStream inputStream = httpClientUtils.httpGetInputStream(url, 3000, header);
+ customFile.setFileName("ip_v4_built_in.mmdb");
+ Reader reader = new Reader(inputStream, new CHMCache());
+ InetAddress ipAddress = InetAddress.getByName("114.64.231.114");
+ JsonNode jsonNode = reader.get(ipAddress);
+ if (jsonNode != null) {
+ System.out.println(StringUtil.setDefaultIfEmpty(jsonNode.toString(), "unkonw").toString());
+ System.out.println(Joiner.on(".").useForNull("").join(jsonNode.get("COUNTRY"),
+ jsonNode.get("SUPER_ADMINISTRATIVE_AREA"), jsonNode.get("ADMINISTRATIVE_AREA")));
+ System.out.println(Joiner.on(".").useForNull("").join(jsonNode.get("COUNTRY"),
+ jsonNode.get("SUPER_ADMINISTRATIVE_AREA"), jsonNode.get("ADMINISTRATIVE_AREA")).replace("\"", ""));
+ }
+ } catch (IOException | RuntimeException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void asnTest() {
+ try {
+ CustomFile customFile = new CustomFile();
+ HttpClientUtils httpClientUtils = new HttpClientUtils();
+ Header header = new BasicHeader("token", FlowWriteConfig.HOS_TOKEN);
+ String url = "http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1b96764c-59dd-4d6b-8edb-623705f708a5-YXNuX3Y0.mmdb";
+ InputStream inputStream = httpClientUtils.httpGetInputStream(url, 3000, header);
+ customFile.setFileName("asn_v4.mmdb");
+ Reader reader = new Reader(inputStream, new CHMCache());
+ InetAddress ipAddress = InetAddress.getByName("114.64.231.114");
+ JsonNode jsonNode = reader.get(ipAddress);
+ if (jsonNode != null) {
+ System.out.println(StringUtil.setDefaultIfEmpty(jsonNode.get("ASN"), "unkonw").toString());
+ System.out.println(StringUtil.setDefaultIfEmpty(jsonNode.get("ASN"), "unkonw").toString().replace("\"", ""));
+ System.out.println(StringUtil.setDefaultIfEmpty(jsonNode.toString(), "unkonw").toString());
+ }
+ } catch (IOException | RuntimeException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java
index cd7ada3..9f22954 100644
--- a/src/test/java/com/zdjizhi/json/JsonPathTest.java
+++ b/src/test/java/com/zdjizhi/json/JsonPathTest.java
@@ -1,23 +1,20 @@
package com.zdjizhi.json;
+import cn.hutool.json.JSONObject;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.StringUtil;
+import com.zdjizhi.tools.json.JsonPathUtil;
import org.junit.Test;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.Executor;
/**
* @author qidaijie
@@ -28,52 +25,56 @@ import java.util.concurrent.Executor;
public class JsonPathTest {
private static final Log logger = LogFactory.get();
- private static Properties propNacos = new Properties();
+ @Test
+ public void arrayFilterTest() {
+ String json = "[{\"tunnels_schema_type\":\"GTP\",\"gtp_uplink_teid\":16777219,\"gtp_downlink_teid\":16777219,\"gtp_sgw_ip\":\"120.36.3.97\",\"gtp_pgw_ip\":\"43.224.53.100\",\"gtp_sgw_port\":2152,\"gtp_pgw_port\":51454},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"80:69:33:ea:a5:57\",\"destination_mac\":\"14:09:dc:df:a3:40\"}]";
+ DocumentContext parse = JsonPath.parse(json);
+ List<Object> fields = parse.read("$.[?(@.tunnels_schema_type=='GTP')]");
+ if (fields.size() > 0) {
+ JSONObject gtpJson = new JSONObject(fields.get(0), false, true);
+ System.out.println(gtpJson.getStr("gtp_uplink_teid"));
+ System.out.println(gtpJson.getStr("gtp_downlink_teid"));
+ } else {
+ System.out.println("no has");
+ }
- /**
- * 获取需要删除字段的列表
- */
- private static ArrayList<String> dropList = new ArrayList<>();
- /**
- * 在内存中加载反射类用的map
- */
- private static HashMap<String, Class> map;
+ ArrayList<Object> read = JsonPath.parse(json).read("$.[?(@.tunnels_schema_type=='GTP')].gtp_uplink_teid");
+ String s = read.get(0).toString();
+ System.out.println("string:" + s);
+ }
- /**
- * 获取任务列表
- * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如:
- * (mail_subject mail_subject decode_of_base64 mail_subject_charset)
- */
- private static ArrayList<String[]> jobList;
- private static String schema;
+ @Test
+ public void getTeidTest() {
+ String message = "[{\"tunnels_schema_type\":\"GTP\",\"gtp_uplink_teid\":999997894,\"gtp_downlink_teid\":665547895,\"gtp_sgw_ip\":\"192.56.5.2\",\"gtp_pgw_ip\":\"192.56.10.20\",\"gtp_sgw_port\":2152,\"gtp_pgw_port\":2152},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0c:29:9b:2f:a4\",\"c2s_destination_mac\":\"00:0c:29:15:b4:f4\",\"s2c_source_mac\":\"00:0c:29:15:b4:f4\",\"s2c_destination_mac\":\"00:0c:29:9b:2f:a4\"}]";
+ System.out.println(JsonPathUtil.getTeidValue(message, "$.[?(@.tunnels_schema_type=='GTP')].gtp_uplink_teid"));
+ System.out.println(JsonPathUtil.getTeidValue(message, "$.[?(@.tunnels_schema_type=='GTP')].gtp_downlink_teid"));
+ }
+
- static {
- propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER);
- propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE);
- propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME);
- propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN);
+ @Test
+ public void getFileMetaTest() {
+ Properties properties = new Properties();
+ properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
+ properties.setProperty(PropertyKeyConst.NAMESPACE, "public");
+ properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
+ properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
+ ConfigService configService = null;
try {
- ConfigService configService = NacosFactory.createConfigService(propNacos);
- String dataId = FlowWriteConfig.NACOS_DATA_ID;
- String group = FlowWriteConfig.NACOS_GROUP;
- String config = configService.getConfig(dataId, group, 5000);
- if (StringUtil.isNotBlank(config)) {
- schema = config;
+ configService = NacosFactory.createConfigService(properties);
+ String message = configService.getConfig("knowledge_base.json", "DEFAULT_GROUP", 5000);
+ ArrayList<Object> read = JsonPath.parse(message).read("$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined','asn_v4','asn_v6'])].['name','sha256','format','path']");
+ if (read.size() >= 1) {
+ for (Object metadata : read) {
+ System.out.println(metadata.toString());
+ JSONObject knowledgeJson = new JSONObject(metadata, false, true);
+ System.out.println("fileName:" + knowledgeJson.getStr("name") + "." + knowledgeJson.getStr("format") + " ---- filePath:" + knowledgeJson.getStr("path") + "\n");
+ }
}
} catch (NacosException e) {
- logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
+ e.printStackTrace();
}
- }
- @Test
- public void parseSchemaGetFields() {
- DocumentContext parse = JsonPath.parse(schema);
- List<Object> fields = parse.read("$.fields[*]");
- for (Object field : fields) {
- String name = JsonPath.read(field, "$.name").toString();
- String type = JsonPath.read(field, "$.type").toString();
- }
}
}
diff --git a/src/test/java/com/zdjizhi/json/JsonTest.java b/src/test/java/com/zdjizhi/json/JsonTest.java
deleted file mode 100644
index 597da40..0000000
--- a/src/test/java/com/zdjizhi/json/JsonTest.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.zdjizhi.json;
-
-import com.zdjizhi.utils.JsonMapper;
-import com.zdjizhi.utils.json.JsonParseUtil;
-import org.junit.Test;
-
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.json
- * @Description:
- * @date 2022/5/515:08
- */
-public class JsonTest {
-
- @Test
- public void JacksonTest() {
- String value = "{\"common_log_id\":null}";
- Map<String, Object> json = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class);
- System.out.println(json.get("common_log_id"));
- }
-} \ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/json/NewSchemaTest.java b/src/test/java/com/zdjizhi/json/NewSchemaTest.java
new file mode 100644
index 0000000..18f14ff
--- /dev/null
+++ b/src/test/java/com/zdjizhi/json/NewSchemaTest.java
@@ -0,0 +1,102 @@
+package com.zdjizhi.json;
+
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONObject;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.utils.StringUtil;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * Applicable to schemas >= TSG22.08
+ *
+ * @author qidaijie
+ * @Package com.zdjizhi.nacos
+ * @Description:
+ * @date 2022/3/1714:57
+ */
+public class NewSchemaTest {
+
+ private static Properties properties = new Properties();
+
+
+ static {
+ properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
+ properties.setProperty(PropertyKeyConst.NAMESPACE, "test");
+ properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
+ properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
+ }
+
+
+ @Test
+ public void newSchemaTest() {
+ try {
+
+ ConfigService configService = NacosFactory.createConfigService(properties);
+ String dataId = "session_record.json";
+ String group = "Galaxy";
+ ArrayList<String[]> newJobList = getNewJobList(configService.getConfig(dataId, group, 5000));
+
+ for (String[] job : newJobList) {
+ System.out.println(Arrays.toString(job));
+ }
+ } catch (NacosException e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ /**
+ * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
+ *
+ * @return 任务列表
+ */
+ private static ArrayList<String[]> getNewJobList(String schema) {
+ ArrayList<String[]> list = new ArrayList<>();
+
+ JSONObject schemaJson = new JSONObject(schema, false, true);
+ JSONArray fields = schemaJson.getJSONArray("fields");
+ for (Object field : fields) {
+ JSONObject fieldJson = new JSONObject(field, false, true);
+ boolean hasDoc = fieldJson.containsKey("doc");
+ if (hasDoc) {
+ JSONObject docJson = fieldJson.getJSONObject("doc");
+ boolean hasFormat = docJson.containsKey("format");
+ if (hasFormat) {
+ String name = fieldJson.getStr("name");
+ JSONArray formatList = docJson.getJSONArray("format");
+ for (Object format : formatList) {
+ JSONObject formatJson = new JSONObject(format, false, true);
+ String function = formatJson.getStr("function");
+ String appendTo;
+ String params = null;
+
+ if (formatJson.containsKey("appendTo")) {
+ appendTo = formatJson.getStr("appendTo");
+ } else {
+ appendTo = name;
+ }
+
+ if (formatJson.containsKey("param")) {
+ params = formatJson.getStr("param");
+ }
+
+ list.add(new String[]{name, appendTo, function, params});
+
+ }
+ }
+ }
+ }
+
+ return list;
+ }
+
+}
diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListener.java b/src/test/java/com/zdjizhi/json/OldSchemaTest.java
index 741b2a3..42c76db 100644
--- a/src/test/java/com/zdjizhi/nacos/SchemaListener.java
+++ b/src/test/java/com/zdjizhi/json/OldSchemaTest.java
@@ -1,4 +1,5 @@
-package com.zdjizhi.nacos;
+package com.zdjizhi.json;
+
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
@@ -6,7 +7,6 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.StringUtil;
@@ -15,67 +15,51 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
-import java.util.concurrent.Executor;
/**
+ * Applicable to schemas < TSG22.08
* @author qidaijie
* @Package com.zdjizhi.nacos
* @Description:
* @date 2022/3/1714:57
*/
-public class SchemaListener {
+public class OldSchemaTest {
private static Properties properties = new Properties();
- private static ArrayList<String[]> jobList;
static {
- properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.40.43:8848");
- properties.setProperty(PropertyKeyConst.NAMESPACE, "test");
+ properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
+ properties.setProperty(PropertyKeyConst.NAMESPACE, "prod");
properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
+ }
+
+
+ @Test
+ public void oldSchemaTest() {
try {
ConfigService configService = NacosFactory.createConfigService(properties);
String dataId = "session_record.json";
String group = "Galaxy";
- jobList = getJobListFromHttp(configService.getConfig(dataId, group, 5000));
- configService.addListener(dataId, group, new Listener() {
- @Override
- public Executor getExecutor() {
- return null;
- }
+ ArrayList<String[]> oldJobList = getOldJobList(configService.getConfig(dataId, group, 5000));
- @Override
- public void receiveConfigInfo(String configMsg) {
- jobList = getJobListFromHttp(configMsg);
- }
- });
+ for (String[] job : oldJobList) {
+ System.out.println(Arrays.toString(job));
+ }
} catch (NacosException e) {
e.printStackTrace();
}
}
-
- @Test
- public void dealCommonMessage() {
- //keep running,change nacos config,print new config
- while (true) {
- try {
- System.out.println(Arrays.toString(jobList.get(0)));
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
/**
- * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
+ * 解析schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
*
+ * @param schema 日志schema
* @return 任务列表
*/
- private static ArrayList<String[]> getJobListFromHttp(String schema) {
+ private static ArrayList<String[]> getOldJobList(String schema) {
ArrayList<String[]> list = new ArrayList<>();
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
@@ -132,5 +116,4 @@ public class SchemaListener {
}
return list;
}
-
}
diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java
index 7745d5f..55dca3f 100644
--- a/src/test/java/com/zdjizhi/nacos/NacosTest.java
+++ b/src/test/java/com/zdjizhi/nacos/NacosTest.java
@@ -90,7 +90,7 @@ public class NacosTest {
}
//keep running,change nacos config,print new config
- while (true) {
+ for (int i = 0; i < 3; i++) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListenerTest.java b/src/test/java/com/zdjizhi/nacos/SchemaListenerTest.java
new file mode 100644
index 0000000..72ae2e0
--- /dev/null
+++ b/src/test/java/com/zdjizhi/nacos/SchemaListenerTest.java
@@ -0,0 +1,116 @@
+package com.zdjizhi.nacos;
+
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONObject;
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.config.listener.Listener;
+import com.alibaba.nacos.api.exception.NacosException;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.nacos
+ * @Description:
+ * @date 2022/3/1714:57
+ */
+public class SchemaListenerTest {
+
+ private static Properties properties = new Properties();
+ private static ArrayList<String[]> jobList;
+
+
+ static {
+ properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
+ properties.setProperty(PropertyKeyConst.NAMESPACE, "test");
+ properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
+ properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
+
+ try {
+ ConfigService configService = NacosFactory.createConfigService(properties);
+ String dataId = "session_record.json";
+ String group = "Galaxy";
+ jobList = getJobListFromHttp(configService.getConfig(dataId, group, 5000));
+ configService.addListener(dataId, group, new Listener() {
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void receiveConfigInfo(String configMsg) {
+ jobList = getJobListFromHttp(configMsg);
+ }
+ });
+ } catch (NacosException e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ @Test
+ public void dealCommonMessage() {
+ //keep running,change nacos config,print new config
+ for (int i = 0; i < 1; i++) {
+ try {
+ for (String[] job : jobList) {
+ System.out.println(Arrays.toString(job));
+ }
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
+ *
+ * @return 任务列表
+ */
+ private static ArrayList<String[]> getJobListFromHttp(String schema) {
+ ArrayList<String[]> list = new ArrayList<>();
+
+ JSONObject schemaJson = new JSONObject(schema, false, true);
+ JSONArray fields = schemaJson.getJSONArray("fields");
+ for (Object field : fields) {
+ JSONObject fieldJson = new JSONObject(field, false, true);
+ boolean hasDoc = fieldJson.containsKey("doc");
+ if (hasDoc) {
+ JSONObject docJson = fieldJson.getJSONObject("doc");
+ boolean hasFormat = docJson.containsKey("format");
+ if (hasFormat) {
+ String name = fieldJson.getStr("name");
+ JSONArray formatList = docJson.getJSONArray("format");
+ for (Object format : formatList) {
+ JSONObject formatJson = new JSONObject(format, false, true);
+ String function = formatJson.getStr("function");
+ String appendTo = null;
+ String params = null;
+
+ if (formatJson.containsKey("appendTo")) {
+ appendTo = formatJson.getStr("appendTo");
+ } else {
+ appendTo = name;
+ }
+
+ if (formatJson.containsKey("param")) {
+ params = formatJson.getStr("param");
+ }
+
+ list.add(new String[]{name, appendTo, function, params});
+
+ }
+ }
+ }
+ }
+
+ return list;
+ }
+}