diff options
| author | qidaijie <[email protected]> | 2022-11-10 19:28:15 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2022-11-10 19:28:15 +0800 |
| commit | be73655ec4c5d9dd7930ed39e797dfee520a02e1 (patch) | |
| tree | 0bf3d790aef6dd702ea4a2292a8aac53a6f0524a | |
| parent | e7a5ecb4f7a338ed156f6f454947728c293617f7 (diff) | |
提交ETL功能支持知识库动态加载功能初版。(GAL-223)
35 files changed, 1919 insertions, 571 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>log-completion-schema</artifactId> - <version>20221103-Base64</version> + <version>20221110-KNOWLEDGEBASE</version> <name>log-completion-schema</name> <url>http://www.example.com</url> diff --git a/properties/default_config.properties b/properties/default_config.properties index 9495f9c..06021b9 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -44,7 +44,27 @@ nacos.username=kANxu/Zi5rBnZVxa5zAjrQ== nacos.pin=YPIBDIXjJUtVBjjk2op0Dg== #nacos group -nacos.group=Galaxy +nacos.schema.group=Galaxy + +#nacos group +nacos.knowledgebase.group=DEFAULT_GROUP + +#================= HTTP 配置 ====================# +#max connection +http.pool.max.connection=400 + +#one route max connection +http.pool.max.per.route=80 + +#connect timeout(ms) +http.pool.connect.timeout=60000 + +#request timeout(ms) +http.pool.request.timeout=60000 + +#response timeout(ms) +http.pool.response.timeout=60000 + #====================Topology Default====================# #hbase radius relation table name hbase.radius.table.name=tsg_galaxy:relation_framedip_account diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index f76f57d..4289e44 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,5 +1,4 @@ #--------------------------------地址配置------------------------------# - #管理kafka地址 source.kafka.servers=192.168.44.12:9094 @@ -12,21 +11,39 @@ zookeeper.servers=192.168.44.12:2181 #hbase zookeeper地址 用于连接HBase hbase.zookeeper.servers=192.168.44.12:2181 +#hdfs地址用于获取定位库 +hdfs.uri=hdfs://192.168.40.151:9000 + #--------------------------------HTTP/定位库------------------------------# -#定位库地址 -tools.library=D:\\workerspace\\dat\\ +#hos token +hos.token=c21f969b5f03d33d43e04f8f136e7682 + +#定位库存储文件系统类型,hdfs or local +knowledgebase.file.system.type=hdfs + +#定位库地址,根据file.system.type配置填写对应地址路径。 +knowledgebase.library=/qitest/ + +#工具库地址,存放秘钥文件等。 +tools.library=E:\\workerspace\\dat\\ #--------------------------------nacos配置------------------------------# #nacos 地址 nacos.server=192.168.44.12:8848 -#nacos namespace +#schema namespace名称 nacos.schema.namespace=test -#nacos data id -nacos.data.id=session_record.json +#schema data id名称 +nacos.schema.data.id=session_record.json + +#knowledgebase namespace名称 +nacos.knowledgebase.namespace=public + +#knowledgebase data id名称 +nacos.knowledgebase.data.id=knowledge_base.json -#--------------------------------Kafka消费组信息------------------------------# +#--------------------------------Kafka消费/生产配置------------------------------# #kafka 接收数据topic source.kafka.topic=test @@ -35,10 +52,7 @@ source.kafka.topic=test sink.kafka.topic=test-result #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=type-test-20220810-1 - -#生产者压缩模式 none or snappy -producer.kafka.compression.type=none +group.id=session-record-log-20211105-1 #--------------------------------topology配置------------------------------# @@ -51,13 +65,17 @@ transform.parallelism=1 #kafka producer 并行度 sink.parallelism=1 -#数据中心,取值范围(0-63) -data.center.id.num=16 +#数据中心,取值范围(0-31) +data.center.id.num=0 #hbase 更新时间,如填写0则不更新缓存 -hbase.tick.tuple.freq.secs=60 +hbase.tick.tuple.freq.secs=180 #--------------------------------默认值配置------------------------------# #0不需要补全原样输出日志,1需要补全 log.need.complete=1 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=none + diff --git a/src/main/java/com/zdjizhi/common/CustomFile.java b/src/main/java/com/zdjizhi/common/CustomFile.java new file mode 100644 index 0000000..1c80f87 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/CustomFile.java @@ -0,0 +1,28 @@ +package com.zdjizhi.common; + +import java.io.Serializable; + +public class CustomFile implements Serializable { + + private String fileName; + + private byte[] content; + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + + public byte[] getContent() { + return content; + } + + public void setContent(byte[] content) { + this.content = content; + } + + +} diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 7f92b02..e1804c9 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -16,6 +16,17 @@ public class FlowWriteConfig { } public static final int IF_PARAM_LENGTH = 3; + + /** + * 定位库默认分隔符 + */ + public static final String LOCATION_SEPARATOR = "."; + + /** + * 默认的文件系统标识 + */ + public static final String FILE_SYSTEM_TYPE = "hdfs"; + /** * 有此标识的字段为失效字段,不计入最终日志字段 */ @@ -48,8 +59,11 @@ public class FlowWriteConfig { */ public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server"); public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace"); - public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id"); - public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group"); + public static final String NACOS_SCHEMA_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.data.id"); + public static final String NACOS_KNOWLEDGEBASE_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.knowledgebase.namespace"); + public static final String NACOS_KNOWLEDGEBASE_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.knowledgebase.data.id"); + public static final String NACOS_KNOWLEDGEBASE_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.knowledgebase.group"); + public static final String NACOS_SCHEMA_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.schema.group"); public static final String NACOS_USERNAME = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "nacos.username")); public static final String NACOS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "nacos.pin")); @@ -76,6 +90,26 @@ public class FlowWriteConfig { public static final String HBASE_GTPC_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.gtpc.table.name"); /** + * HDFS + */ + public static final String HDFS_URI = FlowWriteConfigurations.getStringProperty(0, "hdfs.uri"); + public static final String KNOWLEDGEBASE_FILE_SYSTEM_TYPE = FlowWriteConfigurations.getStringProperty(0, "knowledgebase.file.system.type"); + + /** + * HOS + */ + public static final String HOS_TOKEN = FlowWriteConfigurations.getStringProperty(0, "hos.token"); + + /** + * HTTP + */ + public static final Integer HTTP_POOL_MAX_CONNECTION = FlowWriteConfigurations.getIntProperty(1, "http.pool.max.connection"); + public static final Integer HTTP_POOL_MAX_PER_ROUTE = FlowWriteConfigurations.getIntProperty(1, "http.pool.max.per.route"); + public static final Integer HTTP_POOL_REQUEST_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "http.pool.request.timeout"); + public static final Integer HTTP_POOL_CONNECT_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "http.pool.connect.timeout"); + public static final Integer HTTP_POOL_RESPONSE_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "http.pool.response.timeout"); + + /** * kafka common */ public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user")); @@ -114,6 +148,7 @@ public class FlowWriteConfig { public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers"); public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers"); public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library"); + public static final String KNOWLEDGEBASE_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "knowledgebase.library"); public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java deleted file mode 100644 index 6b8df35..0000000 --- a/src/main/java/com/zdjizhi/tools/functions/MapCompletedFunction.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.zdjizhi.tools.functions; - - -import com.zdjizhi.tools.general.TransFormMap; -import org.apache.flink.api.common.functions.MapFunction; - -import java.util.Map; - - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class MapCompletedFunction implements MapFunction<Map<String, Object>, String> { - - @Override - @SuppressWarnings("unchecked") - public String map(Map<String, Object> logs) { - return TransFormMap.dealCommonMessage(logs); - } -} diff --git a/src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java b/src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java deleted file mode 100644 index 1b8dd6a..0000000 --- a/src/main/java/com/zdjizhi/tools/functions/TypeMapCompletedFunction.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.zdjizhi.tools.functions; - -import com.zdjizhi.tools.general.TransFormTypeMap; -import org.apache.flink.api.common.functions.MapFunction; - -import java.util.Map; - - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class TypeMapCompletedFunction implements MapFunction<Map<String, Object>, String> { - - @Override - @SuppressWarnings("unchecked") - public String map(Map<String, Object> logs) { - - return TransFormTypeMap.dealCommonMessage(logs); - } -} diff --git a/src/main/java/com/zdjizhi/tools/functions/broadcast/KnowledgeBaseSource.java b/src/main/java/com/zdjizhi/tools/functions/broadcast/KnowledgeBaseSource.java new file mode 100644 index 0000000..fa32b70 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/functions/broadcast/KnowledgeBaseSource.java @@ -0,0 +1,174 @@ +package com.zdjizhi.tools.functions.broadcast; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.io.IoUtil; +import cn.hutool.json.JSONObject; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import com.google.common.base.Joiner; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.CustomFile; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.tools.hdfs.HdfsFileUtils; +import com.zdjizhi.tools.http.HttpClientUtils; +import com.zdjizhi.utils.StringUtil; +import org.apache.commons.io.IOUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.http.Header; +import org.apache.http.message.BasicHeader; + +import java.io.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * @author qidaijie + * @Package com.zdjizhi.tools.functions + * @Description: + * @date 2022/11/3 10:37 + */ +public class KnowledgeBaseSource extends RichSourceFunction<List<CustomFile>> { + private static final Log logger = LogFactory.get(); + //文件系统默认标识 + private static final String FILE_SYSTEM_TYPE = "hdfs"; + //JSONpath 表达式 + private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined','asn_v4','asn_v6'])].['name','sha256','format','path']"; + //各文件sha256缓存 + private static HashMap<String, String> knowledgeMetaCache = new HashMap<>(16); + //连接nacos的配置 + private static Properties properties = new Properties(); + //nacos Service + private ConfigService configService; + private boolean isRunning = true; + + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); + properties.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_KNOWLEDGEBASE_NAMESPACE); + properties.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); + properties.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); + configService = NacosFactory.createConfigService(properties); + } + + @Override + public void run(SourceContext<List<CustomFile>> ctx) { + try { + String dataId = FlowWriteConfig.NACOS_KNOWLEDGEBASE_DATA_ID; + String group = FlowWriteConfig.NACOS_KNOWLEDGEBASE_GROUP; + String configMsg = configService.getConfig(dataId, group, 5000); + if (StringUtil.isNotBlank(configMsg)) { + ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR); + if (metaList.size() >= 1) { + for (Object metadata : metaList) { + JSONObject knowledgeJson = new JSONObject(metadata, false, true); + String fileName = Joiner.on(FlowWriteConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), + knowledgeJson.getStr("format")); + String sha256 = knowledgeJson.getStr("sha256"); + knowledgeMetaCache.put(fileName, sha256); + } + } + } + + configService.addListener(dataId, group, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + if (StringUtil.isNotBlank(configMsg)) { + ArrayList<CustomFile> customFiles = new ArrayList<>(); + ArrayList<Object> metaList = JsonPath.parse(configMsg).read(EXPR); + if (metaList.size() >= 1) { + for (Object metadata : metaList) { + JSONObject knowledgeJson = new JSONObject(metadata, false, true); + String fileName = Joiner.on(FlowWriteConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"), + knowledgeJson.getStr("format")); + String sha256 = knowledgeJson.getStr("sha256"); + String filePath = knowledgeJson.getStr("path"); + if (knowledgeMetaCache.containsKey(fileName)) { + if (!sha256.equals(knowledgeMetaCache.get(fileName))) { + CustomFile customFile = loadKnowledge(fileName, filePath); + customFiles.add(customFile); + knowledgeMetaCache.put(fileName, sha256); + + } + } else { + knowledgeMetaCache.put(fileName, sha256); + CustomFile customFile = loadKnowledge(fileName, filePath); + customFiles.add(customFile); + } + } + } + ctx.collect(customFiles); + } + } + }); + + while (true){ + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } catch (NacosException e) { + logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); + } + } + + /** + * 根据知识库返回的地址下载文件,并判断使用的文件系统类型对应更新本地文件 + * + * @param fileName 文件名 + * @param filePath 文件下载地址 + * @return + */ + private CustomFile loadKnowledge(String fileName, String filePath) { + InputStream inputStream = null; + FileOutputStream outputStream = null; + CustomFile customFile = new CustomFile(); + try { + customFile.setFileName(fileName); + Header header = new BasicHeader("token", FlowWriteConfig.HOS_TOKEN); + HttpClientUtils httpClientUtils = new HttpClientUtils(); + inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header); + if (FILE_SYSTEM_TYPE.equals(FlowWriteConfig.KNOWLEDGEBASE_FILE_SYSTEM_TYPE)) { + byte[] bytes = IOUtils.toByteArray(inputStream); + HdfsFileUtils.uploadFileByBytes(FlowWriteConfig.KNOWLEDGEBASE_LIBRARY + fileName, bytes); + customFile.setContent(bytes); + } else { + FileUtil.mkdir(FlowWriteConfig.KNOWLEDGEBASE_LIBRARY); + File file = new File(FlowWriteConfig.KNOWLEDGEBASE_LIBRARY.concat(File.separator).concat(fileName)); + outputStream = new FileOutputStream(file); + byte[] bytes = IOUtils.toByteArray(inputStream); + customFile.setContent(bytes); + inputStream = new ByteArrayInputStream(customFile.getContent()); + IoUtil.copy(inputStream, outputStream); + } + } catch (IOException ioException) { + ioException.printStackTrace(); + } finally { + IOUtils.closeQuietly(inputStream); + IOUtils.closeQuietly(outputStream); + } + return customFile; + } + + @Override + public void cancel() { + this.isRunning = false; + } +} diff --git a/src/main/java/com/zdjizhi/tools/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/tools/functions/filter/FilterNull.java index e5f8526..47d6201 100644 --- a/src/main/java/com/zdjizhi/tools/functions/FilterNullFunction.java +++ b/src/main/java/com/zdjizhi/tools/functions/filter/FilterNull.java @@ -1,4 +1,4 @@ -package com.zdjizhi.tools.functions; +package com.zdjizhi.tools.functions.filter; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.FilterFunction; @@ -9,7 +9,7 @@ import org.apache.flink.api.common.functions.FilterFunction; * @Description: * @date 2021/5/2715:01 */ -public class FilterNullFunction implements FilterFunction<String> { +public class FilterNull implements FilterFunction<String> { @Override public boolean filter(String message) { return StringUtil.isNotBlank(message); diff --git a/src/main/java/com/zdjizhi/tools/functions/map/MapCompleted.java b/src/main/java/com/zdjizhi/tools/functions/map/MapCompleted.java new file mode 100644 index 0000000..0559663 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/functions/map/MapCompleted.java @@ -0,0 +1,74 @@ +package com.zdjizhi.tools.functions.map; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.CustomFile; +import com.zdjizhi.tools.general.IpLookup; +import com.zdjizhi.tools.general.TransForm; +import com.zdjizhi.tools.json.JsonParseUtil; +import com.zdjizhi.utils.JsonMapper; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.List; +import java.util.Map; + + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class MapCompleted extends BroadcastProcessFunction<Map<String, Object>, List<CustomFile>, String> { + private static final Log logger = LogFactory.get(); + + private static IpLookup ipLookup; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + ipLookup = new IpLookup(); + ipLookup.loadIpLookup(); + } + + @Override + public void processElement(Map<String, Object> value, ReadOnlyContext readOnlyContext, Collector<String> out) { + try { + JsonParseUtil.dropJsonField(value); + for (String[] strings : JsonParseUtil.getJobList()) { + //该日志字段的值 + Object logValue = JsonParseUtil.getValue(value, strings[0]); + //结果值映射到的日志字段key + String appendToKey = strings[1]; + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + + //结果值映射到的日志字段原始value + Object appendToValue = JsonParseUtil.getValue(value, appendToKey); + + TransForm.functionSet(function, value, appendToKey, appendToValue, logValue, param, ipLookup); + } + + out.collect(JsonMapper.toJsonString(value)); + + } catch (RuntimeException e) { + logger.error("TransForm log failed ( The field type is not verified ),The exception is :" + e); + e.printStackTrace(); + } + } + + @Override + public void processBroadcastElement(List<CustomFile> customFiles, Context context, Collector<String> out) throws Exception { + if (!customFiles.isEmpty()) { + for (CustomFile customFile : customFiles) { + logger.info("定位库名称:" + customFile.getFileName() + ",更新文件字节长度:" + customFile.getContent().length); + ipLookup.updateIpLookup(customFile.getFileName(), customFile.getContent()); + } + } + } + +} diff --git a/src/main/java/com/zdjizhi/tools/functions/map/TypeMapCompleted.java b/src/main/java/com/zdjizhi/tools/functions/map/TypeMapCompleted.java new file mode 100644 index 0000000..c10ee68 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/functions/map/TypeMapCompleted.java @@ -0,0 +1,74 @@ +package com.zdjizhi.tools.functions.map; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.CustomFile; +import com.zdjizhi.tools.general.IpLookup; +import com.zdjizhi.tools.general.TransForm; +import com.zdjizhi.tools.json.JsonParseUtil; +import com.zdjizhi.utils.JsonMapper; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.List; +import java.util.Map; + + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class TypeMapCompleted extends BroadcastProcessFunction<Map<String, Object>, List<CustomFile>, String> { + private static final Log logger = LogFactory.get(); + + private static IpLookup ipLookup; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + ipLookup = new IpLookup(); + ipLookup.loadIpLookup(); + } + + @Override + public void processElement(Map<String, Object> value, ReadOnlyContext readOnlyContext, Collector<String> out) throws Exception { + try { + Map<String, Object> jsonMap = JsonParseUtil.typeTransform(value); + for (String[] strings : JsonParseUtil.getJobList()) { + //该日志字段的值 + Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); + //结果值映射到的日志字段key + String appendToKey = strings[1]; + //匹配操作函数的字段 + String function = strings[2]; + //额外的参数的值 + String param = strings[3]; + + //结果值映射到的日志字段原始value + Object appendToValue = JsonParseUtil.getValue(jsonMap, appendToKey); + + TransForm.functionSet(function, jsonMap, appendToKey, appendToValue, logValue, param, ipLookup); + } + + out.collect(JsonMapper.toJsonString(jsonMap)); + + } catch (RuntimeException e) { + logger.error("TransForm logs failed( The field type is verified ),The exception is :" + e); + e.printStackTrace(); + } + } + + @Override + public void processBroadcastElement(List<CustomFile> customFiles, Context context, Collector<String> out) throws Exception { + if (!customFiles.isEmpty()) { + for (CustomFile customFile : customFiles) { + logger.info("定位库名称:" + customFile.getFileName() + ",更新文件字节长度:" + customFile.getContent().length); + ipLookup.updateIpLookup(customFile.getFileName(), customFile.getContent()); + } + } + } + +} diff --git a/src/main/java/com/zdjizhi/tools/general/IpLookup.java b/src/main/java/com/zdjizhi/tools/general/IpLookup.java new file mode 100644 index 0000000..ed92de0 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/general/IpLookup.java @@ -0,0 +1,220 @@ +package com.zdjizhi.tools.general; + +import cn.hutool.core.io.file.FileReader; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Joiner; +import com.maxmind.db.CHMCache; +import com.maxmind.db.Reader; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.tools.hdfs.HdfsFileUtils; +import com.zdjizhi.utils.IPUtil; +import com.zdjizhi.utils.StringUtil; +import org.apache.commons.io.IOUtils; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; + +public class IpLookup { + private static final Log logger = LogFactory.get(); + private static final String LOCATION_SEPARATOR = "."; + private static final String PRIVATE_IP = "Private IP"; + private static final String UNKNOWN = ""; + + private static final String[] FILES = {"ip_v4_built_in.mmdb", + "ip_v4_user_defined.mmdb", + "ip_v6_built_in.mmdb", + "ip_v6_user_defined.mmdb", + "asn_v4.mmdb", + "asn_v6.mmdb"}; + + private Reader ipLocationPublicReaderV4 = null; + private Reader ipLocationPublicReaderV6 = null; + private Reader ipLocationPrivateReaderV4 = null; + private Reader ipLocationPrivateReaderV6 = null; + private Reader asnV4Reader = null; + private Reader asnV6Reader = null; + + /** + * 初始化定位库,根据文件系统类型判断从何地获取文件。 + */ + public void loadIpLookup() { + try { + if (FlowWriteConfig.FILE_SYSTEM_TYPE.equals(FlowWriteConfig.KNOWLEDGEBASE_FILE_SYSTEM_TYPE)) { + for (String name : FILES) { + byte[] fileBytes = HdfsFileUtils.downloadFileByBytes(FlowWriteConfig.KNOWLEDGEBASE_LIBRARY + name); + if (fileBytes != null) { + updateIpLookup(name, fileBytes); + } + } + } else { + for (String name : FILES) { + byte[] fileBytes = new FileReader(FlowWriteConfig.KNOWLEDGEBASE_LIBRARY + name).readBytes(); + if (fileBytes != null) { + updateIpLookup(name, fileBytes); + } + } + } + + } catch (RuntimeException e) { + logger.error("IpLookup init data error, please check!"); + } + } + + /** + * 更新定位库 + * + * @param name 文件名 + * @param data 数据 + */ + public void updateIpLookup(String name, byte[] data) { + InputStream inputStream = null; + try { + inputStream = new ByteArrayInputStream(data); + switch (name) { + case "ip_v4_built_in.mmdb": + this.ipLocationPublicReaderV4 = new Reader(inputStream, new CHMCache()); + break; + case "ip_v4_user_defined.mmdb": + this.ipLocationPrivateReaderV4 = new Reader(inputStream, new CHMCache()); + break; + case "ip_v6_built_in.mmdb": + this.ipLocationPublicReaderV6 = new Reader(inputStream, new CHMCache()); + break; + case "ip_v6_user_defined.mmdb": + this.ipLocationPrivateReaderV6 = new Reader(inputStream, new CHMCache()); + break; + case "asn_v4.mmdb": + this.asnV4Reader = new Reader(inputStream, new CHMCache()); + break; + case "asn_v6.mmdb": + this.asnV6Reader = new Reader(inputStream, new CHMCache()); + break; + } + } catch (IOException ioe) { + logger.error("The IpLookup database data conversion error, please check!"); + } catch (RuntimeException e) { + logger.error("IpLookup update data error, please check!"); + } finally { + IOUtils.closeQuietly(inputStream); + } + } + + /** + * 获取ASN信息 + * + * @param ip ip地址 + * @return asn信息 + */ + String asnLookup(String ip) { + JsonNode location = getAsn(ip); + if (IPUtil.internalIp(ip) || StringUtil.isEmpty(location)) { + return UNKNOWN; + } else { + return StringUtil.setDefaultIfEmpty(location.get("ASN"), UNKNOWN).toString().replace("\"", ""); + } + } + + /** + * 获取省市县定位信息 + * + * @param ip ip地址 + * @return 返回省市县信息 + */ + String cityLookupDetail(String ip) { + if (IPUtil.internalIp(ip)) { + return PRIVATE_IP; + } + JsonNode location = getLocation(ip); + if (location == null) { + return UNKNOWN; + } else { + return Joiner.on(LOCATION_SEPARATOR).useForNull("").join(location.get("COUNTRY"), + location.get("SUPER_ADMINISTRATIVE_AREA"), location.get("ADMINISTRATIVE_AREA")).replace("\"", ""); + } + } + + /** + * 获取国家定位信息 + * + * @param ip ip地址 + * @return 返回国家信息 + */ + String countryLookup(String ip) { + if (IPUtil.internalIp(ip)) { + return PRIVATE_IP; + } + JsonNode location = getLocation(ip); + if (location == null) { + return UNKNOWN; + } else { + return StringUtil.setDefaultIfEmpty(location.get("COUNTRY"), UNKNOWN).toString().replace("\"", ""); + } + } + + /** + * 从地理位置定位库获取相关信息 + * + * @param ip ip地址 + * @return ip对应地理位置信息 + */ + private JsonNode getLocation(String ip) { + JsonNode jsonNode = null; + try { + InetAddress ipAddress = InetAddress.getByName(ip); + + if (IPUtil.isIPAddress(ip)) { + + if (IPUtil.isIPv4Address(ip)) { + jsonNode = ipLocationPrivateReaderV4.get(ipAddress); + if (StringUtil.isEmpty(jsonNode)) { + jsonNode = ipLocationPublicReaderV4.get(ipAddress); + } + } else if (IPUtil.isIPv6Address(ip)) { + jsonNode = ipLocationPrivateReaderV6.get(ipAddress); + if (StringUtil.isEmpty(jsonNode)) { + jsonNode = ipLocationPublicReaderV6.get(ipAddress); + } + } + } + } catch (RuntimeException e) { + logger.error("ip address :" + ip + ", parser error " + e); + } catch (IOException e) { + logger.error("IP address " + ip + "of a host could not be determined, parser error " + e); + } + return jsonNode; + } + + /** + * 从ASN定位库获取相关信息 + * + * @param ip ip地址 + * @return ip对应ASN信息 + */ + private JsonNode getAsn(String ip) { + JsonNode jsonNode = null; + try { + InetAddress ipAddress = InetAddress.getByName(ip); + + if (IPUtil.isIPAddress(ip)) { + + if (IPUtil.isIPv4Address(ip)) { + jsonNode = asnV4Reader.get(ipAddress); + } else if (IPUtil.isIPv6Address(ip)) { + jsonNode = asnV6Reader.get(ipAddress); + } + } + } catch (RuntimeException e) { + logger.error("IP address :" + ip + ", parser error " + e); + } catch (IOException e) { + logger.error("IP address " + ip + "of a host could not be determined, parser error " + e); + } + return jsonNode; + } + + +} diff --git a/src/main/java/com/zdjizhi/tools/general/TransFormMap.java b/src/main/java/com/zdjizhi/tools/general/TransForm.java index 19df653..af218a8 100644 --- a/src/main/java/com/zdjizhi/tools/general/TransFormMap.java +++ b/src/main/java/com/zdjizhi/tools/general/TransForm.java @@ -1,9 +1,5 @@ package com.zdjizhi.tools.general; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.tools.json.JsonParseUtil; import java.util.Map; @@ -14,42 +10,7 @@ import java.util.Map; * * @author qidaijie */ -public class TransFormMap { - private static final Log logger = LogFactory.get(); - - /** - * 解析日志,并补全 - * - * @param jsonMap kafka Topic消费原始日志并解析 - * @return 补全后的日志 - */ - @SuppressWarnings("unchecked") - public static String dealCommonMessage(Map<String, Object> jsonMap) { - try { - JsonParseUtil.dropJsonField(jsonMap); - for (String[] strings : JsonParseUtil.getJobList()) { - //该日志字段的值 - Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); - //结果值映射到的日志字段key - String appendToKey = strings[1]; - //匹配操作函数的字段 - String function = strings[2]; - //额外的参数的值 - String param = strings[3]; - - //结果值映射到的日志字段原始value - Object appendToValue = JsonParseUtil.getValue(jsonMap, appendToKey); - - functionSet(function, jsonMap, appendToKey, appendToValue, logValue, param); - } - return JsonMapper.toJsonString(jsonMap); - } catch (RuntimeException e) { - logger.error("TransForm logs failed,The exception is :" + e); - return null; - } - } - - +public class TransForm { /** * 根据schema描述对应字段进行操作的 函数集合 * @@ -60,7 +21,7 @@ public class TransFormMap { * @param logValue 用到的参数的值 * @param param 额外的参数的值 */ - private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKey, Object appendToValue, Object logValue, String param) { + public static void functionSet(String function, Map<String, Object> jsonMap, String appendToKey, Object appendToValue, Object logValue, String param,IpLookup ipLookup) { switch (function) { case "current_timestamp": if (!(appendToValue instanceof Long)) { @@ -72,17 +33,17 @@ public class TransFormMap { break; case "geo_ip_detail": if (logValue != null && appendToValue == null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpDetail(logValue.toString())); + JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpDetail(logValue.toString(),ipLookup)); } break; case "geo_asn": if (logValue != null && appendToValue == null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoAsn(logValue.toString())); + JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoAsn(logValue.toString(),ipLookup)); } break; case "geo_ip_country": if (logValue != null && appendToValue == null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpCountry(logValue.toString())); + JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpCountry(logValue.toString(),ipLookup)); } break; case "flattenSpec": diff --git a/src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java deleted file mode 100644 index 4bc2f36..0000000 --- a/src/main/java/com/zdjizhi/tools/general/TransFormTypeMap.java +++ /dev/null @@ -1,134 +0,0 @@ -package com.zdjizhi.tools.general; - - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.tools.json.JsonParseUtil; - -import java.util.Map; - - -/** - * 描述:转换或补全工具类 - * - * @author qidaijie - */ -public class TransFormTypeMap { - private static final Log logger = LogFactory.get(); - - /** - * 解析日志,并补全 - * - * @param message kafka Topic原始日志 - * @return 补全后的日志 - */ - @SuppressWarnings("unchecked") - public static String dealCommonMessage(Map<String, Object> message) { - try { - Map<String, Object> jsonMap = JsonParseUtil.typeTransform(message); - for (String[] strings : JsonParseUtil.getJobList()) { - //该日志字段的值 - Object logValue = JsonParseUtil.getValue(jsonMap, strings[0]); - //结果值映射到的日志字段key - String appendToKey = strings[1]; - //匹配操作函数的字段 - String function = strings[2]; - //额外的参数的值 - String param = strings[3]; - - //结果值映射到的日志字段原始value - Object appendToValue = JsonParseUtil.getValue(jsonMap, appendToKey); - - functionSet(function, jsonMap, appendToKey, appendToValue, logValue, param); - } - return JsonMapper.toJsonString(jsonMap); - } catch (RuntimeException e) { - logger.error("TransForm logs failed,The exception is :" + e); - e.printStackTrace(); - return null; - } - } - - - /** - * 根据schema描述对应字段进行操作的 函数集合 - * - * @param function 匹配操作函数的字段 - * @param jsonMap 原始日志解析map - * @param appendToKey 需要补全的字段的key - * @param appendToValue 需要补全的字段的值 - * @param logValue 用到的参数的值 - * @param param 额外的参数的值 - */ - private static void functionSet(String function, Map<String, Object> jsonMap, String appendToKey, Object appendToValue, Object logValue, String param) { - switch (function) { - case "current_timestamp": - if (!(appendToValue instanceof Long)) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getCurrentTime()); - } - break; - case "snowflake_id": - JsonParseUtil.setValue(jsonMap, appendToKey, SnowflakeId.generateId()); - break; - case "geo_ip_detail": - if (logValue != null && appendToValue == null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpDetail(logValue.toString())); - } - break; - case "geo_asn": - if (logValue != null && appendToValue == null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoAsn(logValue.toString())); - } - break; - case "geo_ip_country": - if (logValue != null && appendToValue == null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getGeoIpCountry(logValue.toString())); - } - break; - case "flattenSpec": - if (logValue != null && param != null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.flattenSpec(logValue.toString(), param)); - } - break; - case "if": - if (param != null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.condition(jsonMap, param)); - } - break; - case "decode_of_base64": - if (logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.decodeBase64(logValue.toString(), TransFunction.isJsonValue(jsonMap, param))); - } - break; - case "sub_domain": - if (appendToValue == null && logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.getTopDomain(logValue.toString())); - } - break; - case "radius_match": - if (logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKey, TransFunction.radiusMatch(jsonMap, logValue.toString())); - } - break; - case "gtpc_match": - if (logValue != null) { - TransFunction.gtpcMatch(jsonMap, logValue.toString(), appendToKey, param); - } - break; - case "set_value": - if (param != null) { - JsonParseUtil.setValue(jsonMap, appendToKey, param); - } - break; - case "get_value": - if (logValue != null) { - JsonParseUtil.setValue(jsonMap, appendToKey, logValue); - } - break; - default: - } - } - -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/tools/general/TransFunction.java b/src/main/java/com/zdjizhi/tools/general/TransFunction.java index 000ecbb..f5f10e5 100644 --- a/src/main/java/com/zdjizhi/tools/general/TransFunction.java +++ b/src/main/java/com/zdjizhi/tools/general/TransFunction.java @@ -7,7 +7,6 @@ import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.tools.hbase.HBaseUtils; import com.zdjizhi.utils.FormatUtils; -import com.zdjizhi.utils.IpLookupV2; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.tools.json.JsonParseUtil; import com.zdjizhi.tools.json.JsonPathUtil; @@ -25,18 +24,6 @@ class TransFunction { private static final Log logger = LogFactory.get(); /** - * IP定位库工具类 - */ - private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) - .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_built_in.mmdb") - .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb") - .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb") - .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb") - .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb") - .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb") - .build(); - - /** * 生成当前时间戳的操作 */ static long getCurrentTime() { @@ -50,7 +37,7 @@ class TransFunction { * @param ip client IP * @return ip地址详细信息 */ - static String getGeoIpDetail(String ip) { + static String getGeoIpDetail(String ip,IpLookup ipLookup) { String detail = ""; try { detail = ipLookup.cityLookupDetail(ip); @@ -68,7 +55,7 @@ class TransFunction { * @param ip client/server IP * @return ASN */ - static String getGeoAsn(String ip) { + static String getGeoAsn(String ip,IpLookup ipLookup) { String asn = ""; try { asn = ipLookup.asnLookup(ip); @@ -86,7 +73,7 @@ class TransFunction { * @param ip server IP * @return 国家 */ - static String getGeoIpCountry(String ip) { + static String getGeoIpCountry(String ip,IpLookup ipLookup) { String country = ""; try { country = ipLookup.countryLookup(ip); @@ -98,7 +85,6 @@ class TransFunction { return country; } - /** * radius借助HBase补齐 * @@ -117,7 +103,7 @@ class TransFunction { /** * 借助HBase补齐GTP-C信息,解析tunnels信息,优先使用gtp_uplink_teid,其次使用gtp_downlink_teid * <p> - * "common_tunnels":[{"tunnels_schema_type":"GTP","gtp_uplink_teid":235261261,"gtp_downlink_teid":665547833,"gtp_sgw_ip":"192.56.5.2","gtp_pgw_ip":"192.56.10.20","gtp_sgw_port":2152,"gtp_pgw_port":2152}] + * "common_tunnels":[{"tunnels_schema_type":"GTP","gtp_endpoint_a2b_teid":235261261,"gtp_endpoint_b2a_teid":665547833,"gtp_sgw_ip":"192.56.5.2","gtp_pgw_ip":"192.56.10.20","gtp_sgw_port":2152,"gtp_pgw_port":2152}] * * @param jsonMap 原始日志json * @param logValue 上行TEID diff --git a/src/main/java/com/zdjizhi/tools/hdfs/HdfsFileUtils.java b/src/main/java/com/zdjizhi/tools/hdfs/HdfsFileUtils.java new file mode 100644 index 0000000..6c56b77 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/hdfs/HdfsFileUtils.java @@ -0,0 +1,78 @@ +package com.zdjizhi.tools.hdfs; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * @author qidaijie + * @Package com.zdjizhi.tools.hdfs + * @Description: + * @date 2022/11/217:57 + */ +public class HdfsFileUtils { + private static final Log logger = LogFactory.get(); + + + private static HdfsFileUtils hdfsFileUtils; + private static FileSystem fileSystem; + + static { + if (FlowWriteConfig.FILE_SYSTEM_TYPE.equals(FlowWriteConfig.KNOWLEDGEBASE_FILE_SYSTEM_TYPE)) { + Configuration configuration = new Configuration(); + try { + //创建fileSystem,用于连接hdfs + fileSystem = FileSystem.get(new URI(FlowWriteConfig.HDFS_URI), configuration); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + } + } + + /** + * 下载HDFS文件 + * + * @param filePath 文件路径 + * @return 文件 + * @throws IOException + */ + public static byte[] downloadFileByBytes(String filePath) { + try (FSDataInputStream open = fileSystem.open(new Path(filePath))) { + byte[] bytes = new byte[open.available()]; + open.read(0, bytes, 0, open.available()); + return bytes; + } catch (IOException e) { + logger.error("An I/O exception when files are download from HDFS. Message is :" + e.getMessage()); + } + return null; + } + + /** + * 更新文件到HDFS + * + * @param filePath 文件路径 + * @param bytes 文件 + * @throws IOException + */ + public static void uploadFileByBytes(String filePath, byte[] bytes) { + try (FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath), true)) { + fsDataOutputStream.write(bytes); + } catch (RuntimeException e) { + logger.error("Uploading files to the HDFS is abnormal. Message is :" + e.getMessage()); + } catch (IOException e) { + logger.error("An I/O exception when files are uploaded to HDFS. Message is :" + e.getMessage()); + } + } +} + diff --git a/src/main/java/com/zdjizhi/tools/http/HttpClientUtils.java b/src/main/java/com/zdjizhi/tools/http/HttpClientUtils.java new file mode 100644 index 0000000..4c4f0ea --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/http/HttpClientUtils.java @@ -0,0 +1,320 @@ +package com.zdjizhi.tools.http; + + +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import org.apache.commons.io.IOUtils; +import org.apache.http.*; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.ConnectionKeepAliveStrategy; +import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicHeaderElementIterator; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLHandshakeException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + + +public class HttpClientUtils { + /** + * 全局连接池对象 + */ + private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager(); + + private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); + public static final String ERROR_MESSAGE = "-1"; + + /* + * 静态代码块配置连接池信息 + */ + static { + // 设置最大连接数 + CONN_MANAGER.setMaxTotal(FlowWriteConfig.HTTP_POOL_MAX_CONNECTION); + // 设置每个连接的路由数 + CONN_MANAGER.setDefaultMaxPerRoute(FlowWriteConfig.HTTP_POOL_MAX_PER_ROUTE); + + } + + /** + * 获取Http客户端连接对象 + * + * @return Http客户端连接对象 + */ + private static CloseableHttpClient getHttpClient() { + // 创建Http请求配置参数 + RequestConfig requestConfig = RequestConfig.custom() + // 获取连接超时时间 + .setConnectionRequestTimeout(FlowWriteConfig.HTTP_POOL_REQUEST_TIMEOUT) + // 请求超时时间 + .setConnectTimeout(FlowWriteConfig.HTTP_POOL_CONNECT_TIMEOUT) + // 响应超时时间 + .setSocketTimeout(FlowWriteConfig.HTTP_POOL_RESPONSE_TIMEOUT) + .build(); + + /* + * 测出超时重试机制为了防止超时不生效而设置 + * 如果直接放回false,不重试 + * 这里会根据情况进行判断是否重试 + */ + HttpRequestRetryHandler retry = (exception, executionCount, context) -> { + if (executionCount >= 3) {// 如果已经重试了3次,就放弃 + return false; + } + if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 + return true; + } + if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 + return false; + } + if (exception instanceof UnknownHostException) {// 目标服务器不可达 + return false; + } + if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 + return false; + } + if (exception instanceof HttpHostConnectException) {// 连接被拒绝 + return false; + } + if (exception instanceof SSLException) {// ssl握手异常 + return false; + } + if (exception instanceof InterruptedIOException) {// 超时 + return true; + } + HttpClientContext clientContext = HttpClientContext.adapt(context); + HttpRequest request = clientContext.getRequest(); + // 如果请求是幂等的,就再次尝试 + return !(request instanceof HttpEntityEnclosingRequest); + }; + + + ConnectionKeepAliveStrategy myStrategy = (response, context) -> { + HeaderElementIterator it = new BasicHeaderElementIterator + (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); + while (it.hasNext()) { + HeaderElement he = it.nextElement(); + String param = he.getName(); + String value = he.getValue(); + if (value != null && "timeout".equalsIgnoreCase(param)) { + return Long.parseLong(value) * 1000; + } + } + return 60 * 1000;//如果没有约定,则默认定义时长为60s + }; + + // 创建httpClient + return HttpClients.custom() + // 把请求相关的超时信息设置到连接客户端 + .setDefaultRequestConfig(requestConfig) + // 把请求重试设置到连接客户端 + .setRetryHandler(retry) + .setKeepAliveStrategy(myStrategy) + // 配置连接池管理对象 + .setConnectionManager(CONN_MANAGER) + .build(); + } + + + /** + * GET请求 + * + * @param uri 请求地 + * @return message + */ + public static String httpGet(URI uri, Header... headers) { + String msg = ERROR_MESSAGE; + + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + CloseableHttpResponse response = null; + + try { + logger.info("http get uri {}", uri); + // 创建GET请求对象 + HttpGet httpGet = new HttpGet(uri); + + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + logger.info("request header : {}", h); + } + } + // 执行请求 + response = httpClient.execute(httpGet); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + logger.error("Http get content is :{}", msg); + } + + } catch (ClientProtocolException e) { + logger.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + logger.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + logger.error("IO错误: {}", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consume(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("释放链接错误: {}", e.getMessage()); + + } + } + } + + return msg; + } + + /** + * POST 请求 + * + * @param uri uri参数 + * @param requestBody 请求体 + * @return post请求返回结果 + */ + public static String httpPost(URI uri, String requestBody, Header... headers) { + String msg = ERROR_MESSAGE; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + + // 创建POST请求对象 + CloseableHttpResponse response = null; + try { + + logger.info("http post uri:{}, http post body:{}", uri, requestBody); + + HttpPost httpPost = new HttpPost(uri); + httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded"); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpPost.addHeader(h); + logger.info("request header : {}", h); + } + } + + if (StringUtil.isNotBlank(requestBody)) { + byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8); + httpPost.setEntity(new ByteArrayEntity(bytes)); + } + + response = httpClient.execute(httpPost); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + logger.error("Http post content is :{}", msg); + } + } catch (ClientProtocolException e) { + logger.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + logger.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + logger.error("IO错误: {}", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("释放链接错误: {}", e.getMessage()); + + } + } + } + return msg; + } + + /** + * 拼装url + * url ,参数map + */ + public static void setUrlWithParams(URIBuilder uriBuilder, String path, Map<String, Object> params) { + try { + uriBuilder.setPath(path); + if (params != null && !params.isEmpty()) { + for (Map.Entry<String, Object> kv : params.entrySet()) { + uriBuilder.setParameter(kv.getKey(), kv.getValue().toString()); + } + } + } catch (Exception e) { + logger.error("拼接url出错,uri : {}, path : {},参数: {}", uriBuilder.toString(), path, params); + } + } + + + public InputStream httpGetInputStream(String url, int socketTimeout, Header... headers) { + InputStream result = null; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + // 创建GET请求对象 + HttpGet httpGet = new HttpGet(url); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + } + } + CloseableHttpResponse response = null; + + try { + // 执行请求 + response = httpClient.execute(httpGet); + // 获取响应实体 + result = IOUtils.toBufferedInputStream(response.getEntity().getContent()); + // 获取响应信息 + EntityUtils.consume(response.getEntity()); + } catch (ClientProtocolException e) { + logger.error("current file: {},Protocol error:{}", url, e.getMessage()); + + } catch (ParseException e) { + logger.error("current file: {}, Parser error:{}", url, e.getMessage()); + + } catch (IOException e) { + logger.error("current file: {},IO error:{}", url, e.getMessage()); + + } finally { + if (null != response) { + try { + EntityUtils.consume(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("Release Connection error:{}", e.getMessage()); + + } + } + } + return result; + } + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java index 59365be..002b117 100644 --- a/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/tools/json/JsonParseUtil.java @@ -9,7 +9,6 @@ import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; -import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.StringUtil; @@ -55,8 +54,8 @@ public class JsonParseUtil { propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); try { ConfigService configService = NacosFactory.createConfigService(propNacos); - String dataId = FlowWriteConfig.NACOS_DATA_ID; - String group = FlowWriteConfig.NACOS_GROUP; + String dataId = FlowWriteConfig.NACOS_SCHEMA_DATA_ID; + String group = FlowWriteConfig.NACOS_SCHEMA_GROUP; String schema = configService.getConfig(dataId, group, 5000); if (StringUtil.isNotBlank(schema)) { jsonFieldsMap = getFieldsFromSchema(schema); @@ -263,7 +262,6 @@ public class JsonParseUtil { dropList.add(filedStr); } } - System.out.println(map.toString()); return map; } diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java index ec75b8a..3933ee8 100644 --- a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java @@ -34,7 +34,7 @@ public class KafkaConsumer { * @return kafka logs -> map */ @SuppressWarnings("unchecked") - public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer() { + public static FlinkKafkaConsumer<Map<String, Object>> timestampDeserializationConsumer() { FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC, new TimestampDeserializationSchema(), createConsumerConfig()); diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 5581137..685cc0a 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -2,17 +2,21 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zdjizhi.common.CustomFile; import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.tools.functions.FilterNullFunction; -import com.zdjizhi.tools.functions.MapCompletedFunction; -import com.zdjizhi.tools.functions.TypeMapCompletedFunction; +import com.zdjizhi.tools.functions.map.MapCompleted; +import com.zdjizhi.tools.functions.map.TypeMapCompleted; +import com.zdjizhi.tools.functions.broadcast.KnowledgeBaseSource; +import com.zdjizhi.tools.functions.filter.FilterNull; import com.zdjizhi.tools.kafka.KafkaConsumer; import com.zdjizhi.tools.kafka.KafkaProducer; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import java.util.List; import java.util.Map; /** @@ -29,43 +33,56 @@ public class LogFlowWriteTopology { //两个输出之间的最大时间 (单位milliseconds) environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); - if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) { - SingleOutputStreamOperator<Map<String, Object>> streamSource = environment.addSource(KafkaConsumer.myDeserializationConsumer()) - .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM).name(FlowWriteConfig.SOURCE_KAFKA_TOPIC); + //广播流,只开一个并行度 + DataStreamSource<List<CustomFile>> customFileDataStreamSource = environment.addSource(new KnowledgeBaseSource()) + .setParallelism(1); + + //用于存储广播数据的state + MapStateDescriptor<String, List<CustomFile>> descriptor = new MapStateDescriptor<>("descriptor", Types.STRING, Types.LIST(TypeInformation.of(CustomFile.class))); + + //将该流广播赋给缓存 + BroadcastStream<List<CustomFile>> broadcast = customFileDataStreamSource.broadcast(descriptor); - DataStream<String> cleaningLog; + //业务流连接广播流 + BroadcastConnectedStream<Map<String, Object>, List<CustomFile>> connect = environment.addSource(KafkaConsumer.timestampDeserializationConsumer()) + .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM) + .name(FlowWriteConfig.SOURCE_KAFKA_TOPIC) + .connect(broadcast); + + //日志清洗 + SingleOutputStreamOperator<String> completed; switch (FlowWriteConfig.LOG_TRANSFORM_TYPE) { case 0: //对原始日志进行处理补全转换等,不对日志字段类型做校验。 - cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction") + completed = connect.process(new MapCompleted()) + .name("MapCompletedFunction") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); break; case 1: //对原始日志进行处理补全转换等,对日志字段类型做若校验,可根据schema进行强转。 - cleaningLog = streamSource.map(new TypeMapCompletedFunction()).name("TypeMapCompletedFunction") + completed = connect.process(new TypeMapCompleted()) + .name("TypeMapCompletedFunction") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); break; default: - //对原始日志进行处理补全转换等,不对日志字段类型做校验。 - cleaningLog = streamSource.map(new MapCompletedFunction()).name("MapCompletedFunction") + //对原始日志进行处理补全转换等,对日志字段类型做若校验,可根据schema进行强转。 + completed = connect.process(new TypeMapCompleted()) + .name("TypeMapCompletedFunction") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); } - //过滤空数据不发送到Kafka内 - DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData") - .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); - //发送数据到Kafka - result.addSink(KafkaProducer.getKafkaProducer()).name(FlowWriteConfig.SINK_KAFKA_TOPIC) + completed.addSink(KafkaProducer.getKafkaProducer()).name(FlowWriteConfig.SINK_KAFKA_TOPIC) .setParallelism(FlowWriteConfig.SINK_PARALLELISM); } else { + //不进行任何处理,仅发送到kafka内。 DataStreamSource<String> streamSource = environment.addSource(KafkaConsumer.flinkConsumer()) .setParallelism(FlowWriteConfig.SOURCE_PARALLELISM); //过滤空数据不发送到Kafka内 - DataStream<String> result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData") + DataStream<String> result = streamSource.filter(new FilterNull()).name("FilterOriginalData") .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM); //发送数据到Kafka diff --git a/src/main/logback.xml b/src/main/logback.xml deleted file mode 100644 index 59095f6..0000000 --- a/src/main/logback.xml +++ /dev/null @@ -1,42 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<configuration> - - <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符--> - <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /> - <!-- 定义日志存储的路径,不要配置相对路径 --> - <property name="LOG_FILE_PATH" value="E:/logs/demo.%d{yyyy-MM-dd}.%i.log" /> - - <!-- 控制台输出日志 --> - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <!-- 按照上面配置的LOG_PATTERN来打印日志 --> - <pattern>${LOG_PATTERN}</pattern> - </encoder> - </appender> - - <!--每天生成一个日志文件,保存30天的日志文件。rollingFile是用来切分文件的 --> - <appender name="FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE_PATH}</fileNamePattern> - <!-- keep 15 days' worth of history --> - <maxHistory>30</maxHistory> - <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> - <!-- 日志文件的最大大小 --> - <maxFileSize>20MB</maxFileSize> - </timeBasedFileNamingAndTriggeringPolicy> - </rollingPolicy> - - <encoder> - <pattern>${LOG_PATTERN}</pattern> - </encoder> - </appender> - <!-- project default level项目输出的日志级别 --> - <logger name="com.example.demo" level="INFO" /> - - <!-- 日志输出级别 常用的日志级别按照从高到低依次为:ERROR、WARN、INFO、DEBUG。 --> - <root level="INFO"> - <appender-ref ref="CONSOLE" /> - <appender-ref ref="FILE" /><!--对应appender name="FILE"。 --> - </root> -</configuration>
\ No newline at end of file diff --git a/src/test/java/com/zdjizhi/EncryptorTest.java b/src/test/java/com/zdjizhi/EncryptorTest.java deleted file mode 100644 index 9bd8e71..0000000 --- a/src/test/java/com/zdjizhi/EncryptorTest.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.zdjizhi; - -import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; -import org.junit.Test; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2022/3/1610:55 - */ -public class EncryptorTest { - - - @Test - public void passwordTest(){ - StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); - // 配置加密解密的密码/salt值 - encryptor.setPassword("galaxy"); - // 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p - String pin = "galaxy2019"; - String encPin = encryptor.encrypt(pin); - String user = "admin"; - String encUser = encryptor.encrypt(user); - System.out.println(encPin); - System.out.println(encUser); - // 再进行解密:raw_password - String rawPwd = encryptor.decrypt("ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)"); - String rawUser = encryptor.decrypt("ENC(nnasyGpHKGFA4KW0zro9MDdw==)"); - - System.out.println("The username is: "+rawPwd); - System.out.println("The pin is: "+rawUser); - } - -} diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java deleted file mode 100644 index c667224..0000000 --- a/src/test/java/com/zdjizhi/FunctionTest.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.zdjizhi; - -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.IpLookupV2; -import com.zdjizhi.utils.general.CityHash; -import org.junit.Test; - -import java.math.BigInteger; -import java.util.Calendar; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2021/11/611:38 - */ -public class FunctionTest { - - private static IpLookupV2 ipLookup = new IpLookupV2.Builder(false) - .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb") -// .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_built_in.mmdb") -// .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4_user_defined.mmdb") -// .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6_user_defined.mmdb") - .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb") - .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb") - .build(); - - @Test - public void CityHashTest() { - - byte[] dataBytes = String.valueOf(613970406986188816L).getBytes(); - long hashValue = CityHash.CityHash64(dataBytes, 0, dataBytes.length); - String decimalValue = Long.toUnsignedString(hashValue, 10); - BigInteger result = new BigInteger(decimalValue); - System.out.println(result); - } - - @Test - public void ipLookupTest() { - String ip = "0.255.255.254"; - System.out.println(ipLookup.cityLookupDetail(ip)); - System.out.println(ipLookup.countryLookup(ip)); - } - - @Test - public void timestampTest(){ - Calendar cal = Calendar.getInstance(); - Long utcTime=cal.getTimeInMillis(); - System.out.println(utcTime); - System.out.println(System.currentTimeMillis()); - } -} diff --git a/src/test/java/com/zdjizhi/HBaseTest.java b/src/test/java/com/zdjizhi/HBaseTest.java deleted file mode 100644 index 5f94e32..0000000 --- a/src/test/java/com/zdjizhi/HBaseTest.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.zdjizhi; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; - -import java.io.IOException; -import java.util.Arrays; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2021/12/310:42 - */ -public class HBaseTest { - - @Test - public void getColumn() { - // 管理Hbase的配置信息 - Configuration configuration = HBaseConfiguration.create(); - // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181"); - configuration.set("hbase.client.retries.number", "3"); - configuration.set("hbase.bulkload.retries.number", "3"); - configuration.set("zookeeper.recovery.retry", "3"); - try { - Connection connection = ConnectionFactory.createConnection(configuration); - Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account")); - Scan scan2 = new Scan(); - ResultScanner scanner = table.getScanner(scan2); - for (Result result : scanner) { - int acctStatusType; - boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); - if (hasType) { - acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); - } else { - acctStatusType = 3; - } - String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); - String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); - System.out.println("status" + acctStatusType + "key:" + framedIp + "value:" + account); -// System.out.println(Arrays.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")))); - } - } catch (IOException e) { - e.printStackTrace(); - } - } -} diff --git a/src/test/java/com/zdjizhi/function/Base64Test.java b/src/test/java/com/zdjizhi/function/Base64Test.java new file mode 100644 index 0000000..891b5af --- /dev/null +++ b/src/test/java/com/zdjizhi/function/Base64Test.java @@ -0,0 +1,77 @@ +package com.zdjizhi.function; + +import cn.hutool.core.codec.Base64; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.StringUtil; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; + +/** + * @author qidaijie + * @Package com.zdjizhi.function + * @Description: + * @date 2022/11/39:36 + */ +public class Base64Test { + private static final Log logger = LogFactory.get(); + + /** + * 根据编码解码base64(hutool) + + * @return 解码字符串 + */ + @Test + public void decodeBase64Hutool() { + try { + System.out.println(Base64.decodeStr("bWFpbF90ZXN0X2VuZ2xpc2gudHh0")); + System.out.println(Base64.decodeStr("aGVsbG8=")); + } catch (RuntimeException e) { + logger.error("Resolve Base64 exception, exception information:" + e.getMessage()); + e.printStackTrace(); + } + } + + + /** + * 根据编码解码base64 + + * @return 解码字符串 + */ + @Test + public void encodeBase64() { + try { + System.out.println(java.util.Base64.getUrlEncoder().encodeToString("runoob?java8".getBytes("ISO-8859-1"))); + System.out.println(java.util.Base64.getUrlEncoder().encodeToString("runoob?java8".getBytes("utf-8"))); + + } catch (RuntimeException e) { + logger.error("Resolve Base64 exception, exception information:" + e.getMessage()); + e.printStackTrace(); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + + /** + * 根据编码解码base64 + + * @return 解码字符串 + */ + @Test + public void decodeBase64() { + try { + byte[] base64decodedBytes = java.util.Base64.getDecoder().decode("bWFpbF90ZXN0X2VuZ2xpc2gudHh0"); + + System.out.println("原始字符串: " + new String(base64decodedBytes, "utf-8")); + System.out.println("原始字符串: " + new String(base64decodedBytes)); + + } catch (RuntimeException e) { + logger.error("Resolve Base64 exception, exception information:" + e.getMessage()); + e.printStackTrace(); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/com/zdjizhi/function/EncryptorTest.java b/src/test/java/com/zdjizhi/function/EncryptorTest.java new file mode 100644 index 0000000..d00a62f --- /dev/null +++ b/src/test/java/com/zdjizhi/function/EncryptorTest.java @@ -0,0 +1,38 @@ +package com.zdjizhi.function; + +import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; +import org.junit.Test; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2022/3/1610:55 + */ +public class EncryptorTest { + + + @Test + public void passwordTest(){ + StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); + // 配置加密解密的密码/salt值 + encryptor.setPassword("galaxy"); + // 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p + String kafkaUser = encryptor.encrypt("admin"); + String kafkaPin = encryptor.encrypt("galaxy2019"); + String nacosPin = encryptor.encrypt("nacos"); + String nacosUser = encryptor.encrypt("nacos"); + + System.out.println("Kafka:\n"+"The username is: "+kafkaUser); + System.out.println("The pin is: "+kafkaPin); + System.out.println("Nacos:\n"+"The username is: "+nacosUser); + System.out.println("The pin is: "+nacosPin); + // 再进行解密:raw_password + System.out.println("Kafka:\n"+"The username is: "+encryptor.decrypt(kafkaUser)); + System.out.println("The pin is: "+encryptor.decrypt(kafkaPin)); + + System.out.println("Nacos:\n"+"The username is: "+encryptor.decrypt(nacosUser)); + System.out.println("The pin is: "+encryptor.decrypt(nacosPin)); + } + +} diff --git a/src/test/java/com/zdjizhi/function/HBaseTest.java b/src/test/java/com/zdjizhi/function/HBaseTest.java new file mode 100644 index 0000000..bfdcdf9 --- /dev/null +++ b/src/test/java/com/zdjizhi/function/HBaseTest.java @@ -0,0 +1,245 @@ +package com.zdjizhi.function; + +import cn.hutool.json.JSONObject; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.tools.hbase.HBaseUtils; +import com.zdjizhi.utils.StringUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import scala.Int; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/12/310:42 + */ +public class HBaseTest { + private static final Log logger = LogFactory.get(); + private static Map<String, String> radiusMap = new ConcurrentHashMap<>(16); + + private static Map<String,HashMap<String, Object>> gtpcMap = new ConcurrentHashMap<>(16); + + @Test + public void getColumn() { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", "192.168.44.111:2181"); + configuration.set("hbase.client.retries.number", "1"); + configuration.set("hbase.client.pause", "50"); + configuration.set("hbase.rpc.timeout", "3000"); + configuration.set("zookeeper.recovery.retry", "1"); + configuration.set("zookeeper.recovery.retry.intervalmill", "200"); + try { + System.out.println(System.currentTimeMillis()); + Connection connection = ConnectionFactory.createConnection(configuration); + Table table = connection.getTable(TableName.valueOf("tsg_galaxy:relation_framedip_account")); + Scan scan2 = new Scan(); + ResultScanner scanner = table.getScanner(scan2); + for (Result result : scanner) { + int acctStatusType; + boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); + if (hasType) { + acctStatusType = Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); + } else { + acctStatusType = 3; + } + String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); + String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); + System.out.println("status" + acctStatusType + "key:" + framedIp + "value:" + account); + } + } catch (IOException e) { + e.printStackTrace(); + }finally { + System.out.println(System.currentTimeMillis()); + } + } + + + @Test + public void getGtpcData() { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", "192.168.44.11:2181"); + configuration.set("hbase.client.retries.number", "1"); + configuration.set("hbase.client.pause", "50"); + configuration.set("hbase.rpc.timeout", "3000"); + configuration.set("zookeeper.recovery.retry", "1"); + configuration.set("zookeeper.recovery.retry.intervalmill", "200"); + long begin = System.currentTimeMillis(); + ResultScanner scanner = null; + try { + Connection connection = ConnectionFactory.createConnection(configuration); + Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_GTPC_TABLE_NAME)); + Scan scan2 = new Scan(); + scanner = table.getScanner(scan2); + for (Result result : scanner) { + String upLinkTeid = getTeid(result, "uplink_teid"); + String downLinkTeid = getTeid(result, "downlink_teid"); + String phoneNumber = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "phone_number").trim(); + String imsi = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imsi").trim(); + String imei = getString(result, FlowWriteConfig.GTPC_FAMILY_NAME, "imei").trim(); + Long lastUpdateTime = getLong(result, FlowWriteConfig.GTPC_FAMILY_NAME, "last_update_time"); + + HashMap<String, Object> buildUserData = buildUserData(phoneNumber, imsi, imei, lastUpdateTime); + + if (FlowWriteConfig.DEFAULT_RELATIONSHIP_MODULE.equals(FlowWriteConfig.DATA_RELATIONSHIP_MODEL)) { + String vsysId = getVsysId(result).trim(); + updateCache(gtpcMap, upLinkTeid+vsysId, buildUserData, lastUpdateTime); + updateCache(gtpcMap, downLinkTeid+vsysId, buildUserData, lastUpdateTime); + } else { + updateCache(gtpcMap, upLinkTeid, buildUserData, lastUpdateTime); + updateCache(gtpcMap, downLinkTeid, buildUserData, lastUpdateTime); + } + } + logger.warn("The obtain the number of GTP-C relationships : " + gtpcMap.size()); + logger.warn("The time spent to obtain GTP-C relationships : " + (System.currentTimeMillis() - begin)); + } catch (IOException | RuntimeException e) { + logger.error("The relationship between USER and TEID obtained from HBase is abnormal! message is :" + e); + e.printStackTrace(); + } finally { + if (scanner != null) { + scanner.close(); + } + } + + for (String key : gtpcMap.keySet()){ + System.out.println(key +"---"+gtpcMap.get(key)); + } + } + + /** + * 获取HBase内String类型的值 + * + * @param result 结果集 + * @param familyName 列族名称 + * @param columnName 列名称 + * @return 结果数据 + */ + private static String getString(Result result, String familyName, String columnName) { + byte[] familyBytes = Bytes.toBytes(familyName); + byte[] columnBytes = Bytes.toBytes(columnName); + boolean contains = result.containsColumn(familyBytes, columnBytes); + if (contains) { + String data = Bytes.toString(result.getValue(familyBytes, columnBytes)).trim(); + if (StringUtil.isNotBlank(data)) { + return data; + } + } + + return ""; + } + + /** + * 获取HBase内String类型的值 + * + * @param result 结果集 + * @param columnName 列名称 + * @return 结果数据 + */ + private static Long getLong(Result result, String familyName, String columnName) { + byte[] familyBytes = Bytes.toBytes(familyName); + byte[] columnBytes = Bytes.toBytes(columnName); + boolean contains = result.containsColumn(familyBytes, columnBytes); + if (contains) { + return Bytes.toLong(result.getValue(familyBytes, columnBytes)); + } + return 0L; + } + + /** + * 获取HBase内String类型的值 + * + * @param result 结果集 + * @param columnName 列名称 + * @return 结果数据 + */ + private static String getTeid(Result result, String columnName) { + byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.GTPC_FAMILY_NAME); + byte[] columnBytes = Bytes.toBytes(columnName); + boolean contains = result.containsColumn(familyBytes, columnBytes); + if (contains) { + String data = String.valueOf(Bytes.toLong(result.getValue(familyBytes, columnBytes))).trim(); + if (StringUtil.isNotBlank(data)) { + return data; + } + } + return "0"; + } + + /** + * 构建用户信息 + * + * @param phoneNumber 手机号 + * @param imsi 用户标识 + * @param imei 设备标识 + * @return 用户信息 + */ + private static HashMap<String, Object> buildUserData(String phoneNumber, String imsi, String imei, Long lastUpdateTime) { + HashMap<String, Object> tmpMap = new HashMap<>(4); + tmpMap.put("common_phone_number", phoneNumber); + tmpMap.put("common_imsi", imsi); + tmpMap.put("common_imei", imei); + tmpMap.put("last_update_time", lastUpdateTime); + return tmpMap; + } + + + /** + * 获取HBase内String类型的值 + * + * @param result 结果集 + * @return 结果数据 + */ + static String getVsysId(Result result) { + byte[] familyBytes = Bytes.toBytes(FlowWriteConfig.COMMON_FAMILY_NAME); + byte[] columnBytes = Bytes.toBytes("vsys_id"); + boolean contains = result.containsColumn(familyBytes, columnBytes); + if (contains) { + String data = String.valueOf(Bytes.toInt(result.getValue(familyBytes, columnBytes))).trim(); + if (StringUtil.isNotBlank(data)) { + return data; + } + } + return "1"; + } + + /** + * 判断缓存与新获取的数据时间戳大小,若大于缓存内记录的时间戳;则更新缓存 + * + * @param gtpcMap 缓存集合 + * @param key 上下行teid + * @param userData 获取HBase内的用户信息 + * @param lastUpdateTime 该用户信息最后更新时间 + */ + private static void updateCache(Map<String, HashMap<String, Object>> gtpcMap, String key, HashMap<String, Object> userData, Long lastUpdateTime) { + if (StringUtil.isNotBlank(key)){ + if (gtpcMap.containsKey(key)) { + Long oldUpdateTime = Long.parseLong(gtpcMap.get(key).get("last_update_time").toString()); + if (lastUpdateTime > oldUpdateTime) { + gtpcMap.put(key, userData); + } + } else { + gtpcMap.put(key, userData); + } + } + } + + +} diff --git a/src/test/java/com/zdjizhi/hdfs/FileUtilsTest.java b/src/test/java/com/zdjizhi/hdfs/FileUtilsTest.java new file mode 100644 index 0000000..24f8696 --- /dev/null +++ b/src/test/java/com/zdjizhi/hdfs/FileUtilsTest.java @@ -0,0 +1,68 @@ +package com.zdjizhi.hdfs; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.IOUtils; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * @author qidaijie + * @Package com.zdjizhi.tools.hdfs + * @Description: + * @date 2022/11/217:57 + */ +public class FileUtilsTest { + private static final Log logger = LogFactory.get(); + + private static FileSystem fileSystem; + + static { + Configuration configuration = new Configuration(); + try { + //创建fileSystem,用于连接hdfs + fileSystem = FileSystem.get(new URI(FlowWriteConfig.HDFS_URI),configuration); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + } + + @Test + public void mkdir() throws Exception{ + fileSystem.mkdirs(new Path("/IPlocation/ETL-SESSION-RECORD-COMPLETED")); + } + + @Test + public void create() throws Exception{ + FSDataOutputStream outputStream = fileSystem.create(new Path("/qitest/test/test.txt")); + outputStream.write("Hello World".getBytes()); + outputStream.flush(); + outputStream.close(); + } + + @Test + public void cat() throws Exception{ + FSDataInputStream inputStream = fileSystem.open(new Path("/qitest/test/test.txt")); + IOUtils.copyBytes(inputStream, System.out, 1024); + inputStream.close(); + } + + @Test + public void rename() throws Exception{ + fileSystem.rename(new Path("/qitest"), new Path("/IPlocation")); + } + + @Test + public void delete() throws Exception{ + fileSystem.delete(new Path("/qitest/test2"),true);//是否递归删除 + } +} + diff --git a/src/test/java/com/zdjizhi/hos/hosUtilsTest.java b/src/test/java/com/zdjizhi/hos/hosUtilsTest.java new file mode 100644 index 0000000..27f4c60 --- /dev/null +++ b/src/test/java/com/zdjizhi/hos/hosUtilsTest.java @@ -0,0 +1,101 @@ +package com.zdjizhi.hos; + +import cn.hutool.core.io.IoUtil; +import cn.hutool.http.HttpUtil; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Joiner; +import com.maxmind.db.CHMCache; +import com.maxmind.db.Reader; +import com.zdjizhi.common.CustomFile; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.tools.http.HttpClientUtils; +import com.zdjizhi.utils.IpLookupV2; +import com.zdjizhi.utils.StringUtil; +import org.apache.http.Header; +import org.apache.http.message.BasicHeader; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; + +/** + * @author qidaijie + * @Package com.zdjizhi.hos + * @Description: + * @date 2022/11/713:55 + */ +public class hosUtilsTest { + @Test + public void downloadToLocalTest() { + FileOutputStream outputStream = null; + InputStream inputStream = null; + try { + HttpClientUtils httpClientUtils = new HttpClientUtils(); + Header header = new BasicHeader("token", FlowWriteConfig.HOS_TOKEN); + String url = "http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/a69c41bc-c673-46da-a810-f29aa18a3a16-aXBfdjRfYnVpbHRfaW4=.mmdb"; + inputStream = httpClientUtils.httpGetInputStream(url, 3000, header); + File file = new File(FlowWriteConfig.TOOLS_LIBRARY.concat(File.separator).concat("ip_v4_built_in.mmdb")); + if (!file.getParentFile().exists()) { + file.getParentFile().mkdir(); + } + outputStream = new FileOutputStream(file); + IoUtil.copy(inputStream, outputStream); + } catch (IOException | RuntimeException e) { + e.printStackTrace(); + } finally { + IoUtil.close(inputStream); + IoUtil.close(outputStream); + } + } + + + @Test + public void locationTest() { + try { + CustomFile customFile = new CustomFile(); + HttpClientUtils httpClientUtils = new HttpClientUtils(); + Header header = new BasicHeader("token", FlowWriteConfig.HOS_TOKEN); + String url = "http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/a69c41bc-c673-46da-a810-f29aa18a3a16-aXBfdjRfYnVpbHRfaW4=.mmdb"; + InputStream inputStream = httpClientUtils.httpGetInputStream(url, 3000, header); + customFile.setFileName("ip_v4_built_in.mmdb"); + Reader reader = new Reader(inputStream, new CHMCache()); + InetAddress ipAddress = InetAddress.getByName("114.64.231.114"); + JsonNode jsonNode = reader.get(ipAddress); + if (jsonNode != null) { + System.out.println(StringUtil.setDefaultIfEmpty(jsonNode.toString(), "unkonw").toString()); + System.out.println(Joiner.on(".").useForNull("").join(jsonNode.get("COUNTRY"), + jsonNode.get("SUPER_ADMINISTRATIVE_AREA"), jsonNode.get("ADMINISTRATIVE_AREA"))); + System.out.println(Joiner.on(".").useForNull("").join(jsonNode.get("COUNTRY"), + jsonNode.get("SUPER_ADMINISTRATIVE_AREA"), jsonNode.get("ADMINISTRATIVE_AREA")).replace("\"", "")); + } + } catch (IOException | RuntimeException e) { + e.printStackTrace(); + } + } + + @Test + public void asnTest() { + try { + CustomFile customFile = new CustomFile(); + HttpClientUtils httpClientUtils = new HttpClientUtils(); + Header header = new BasicHeader("token", FlowWriteConfig.HOS_TOKEN); + String url = "http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1b96764c-59dd-4d6b-8edb-623705f708a5-YXNuX3Y0.mmdb"; + InputStream inputStream = httpClientUtils.httpGetInputStream(url, 3000, header); + customFile.setFileName("asn_v4.mmdb"); + Reader reader = new Reader(inputStream, new CHMCache()); + InetAddress ipAddress = InetAddress.getByName("114.64.231.114"); + JsonNode jsonNode = reader.get(ipAddress); + if (jsonNode != null) { + System.out.println(StringUtil.setDefaultIfEmpty(jsonNode.get("ASN"), "unkonw").toString()); + System.out.println(StringUtil.setDefaultIfEmpty(jsonNode.get("ASN"), "unkonw").toString().replace("\"", "")); + System.out.println(StringUtil.setDefaultIfEmpty(jsonNode.toString(), "unkonw").toString()); + } + } catch (IOException | RuntimeException e) { + e.printStackTrace(); + } + } + +} diff --git a/src/test/java/com/zdjizhi/json/JsonPathTest.java b/src/test/java/com/zdjizhi/json/JsonPathTest.java index cd7ada3..9f22954 100644 --- a/src/test/java/com/zdjizhi/json/JsonPathTest.java +++ b/src/test/java/com/zdjizhi/json/JsonPathTest.java @@ -1,23 +1,20 @@ package com.zdjizhi.json; +import cn.hutool.json.JSONObject; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.tools.json.JsonPathUtil; import org.junit.Test; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Properties; -import java.util.concurrent.Executor; /** * @author qidaijie @@ -28,52 +25,56 @@ import java.util.concurrent.Executor; public class JsonPathTest { private static final Log logger = LogFactory.get(); - private static Properties propNacos = new Properties(); + @Test + public void arrayFilterTest() { + String json = "[{\"tunnels_schema_type\":\"GTP\",\"gtp_uplink_teid\":16777219,\"gtp_downlink_teid\":16777219,\"gtp_sgw_ip\":\"120.36.3.97\",\"gtp_pgw_ip\":\"43.224.53.100\",\"gtp_sgw_port\":2152,\"gtp_pgw_port\":51454},{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"80:69:33:ea:a5:57\",\"destination_mac\":\"14:09:dc:df:a3:40\"}]"; + DocumentContext parse = JsonPath.parse(json); + List<Object> fields = parse.read("$.[?(@.tunnels_schema_type=='GTP')]"); + if (fields.size() > 0) { + JSONObject gtpJson = new JSONObject(fields.get(0), false, true); + System.out.println(gtpJson.getStr("gtp_uplink_teid")); + System.out.println(gtpJson.getStr("gtp_downlink_teid")); + } else { + System.out.println("no has"); + } - /** - * 获取需要删除字段的列表 - */ - private static ArrayList<String> dropList = new ArrayList<>(); - /** - * 在内存中加载反射类用的map - */ - private static HashMap<String, Class> map; + ArrayList<Object> read = JsonPath.parse(json).read("$.[?(@.tunnels_schema_type=='GTP')].gtp_uplink_teid"); + String s = read.get(0).toString(); + System.out.println("string:" + s); + } - /** - * 获取任务列表 - * list的每个元素是一个四元字符串数组 (有format标识的字段,补全的字段,用到的功能函数,用到的参数),例如: - * (mail_subject mail_subject decode_of_base64 mail_subject_charset) - */ - private static ArrayList<String[]> jobList; - private static String schema; + @Test + public void getTeidTest() { + String message = "[{\"tunnels_schema_type\":\"GTP\",\"gtp_uplink_teid\":999997894,\"gtp_downlink_teid\":665547895,\"gtp_sgw_ip\":\"192.56.5.2\",\"gtp_pgw_ip\":\"192.56.10.20\",\"gtp_sgw_port\":2152,\"gtp_pgw_port\":2152},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0c:29:9b:2f:a4\",\"c2s_destination_mac\":\"00:0c:29:15:b4:f4\",\"s2c_source_mac\":\"00:0c:29:15:b4:f4\",\"s2c_destination_mac\":\"00:0c:29:9b:2f:a4\"}]"; + System.out.println(JsonPathUtil.getTeidValue(message, "$.[?(@.tunnels_schema_type=='GTP')].gtp_uplink_teid")); + System.out.println(JsonPathUtil.getTeidValue(message, "$.[?(@.tunnels_schema_type=='GTP')].gtp_downlink_teid")); + } + - static { - propNacos.setProperty(PropertyKeyConst.SERVER_ADDR, FlowWriteConfig.NACOS_SERVER); - propNacos.setProperty(PropertyKeyConst.NAMESPACE, FlowWriteConfig.NACOS_SCHEMA_NAMESPACE); - propNacos.setProperty(PropertyKeyConst.USERNAME, FlowWriteConfig.NACOS_USERNAME); - propNacos.setProperty(PropertyKeyConst.PASSWORD, FlowWriteConfig.NACOS_PIN); + @Test + public void getFileMetaTest() { + Properties properties = new Properties(); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); + properties.setProperty(PropertyKeyConst.NAMESPACE, "public"); + properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); + properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); + ConfigService configService = null; try { - ConfigService configService = NacosFactory.createConfigService(propNacos); - String dataId = FlowWriteConfig.NACOS_DATA_ID; - String group = FlowWriteConfig.NACOS_GROUP; - String config = configService.getConfig(dataId, group, 5000); - if (StringUtil.isNotBlank(config)) { - schema = config; + configService = NacosFactory.createConfigService(properties); + String message = configService.getConfig("knowledge_base.json", "DEFAULT_GROUP", 5000); + ArrayList<Object> read = JsonPath.parse(message).read("$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined','asn_v4','asn_v6'])].['name','sha256','format','path']"); + if (read.size() >= 1) { + for (Object metadata : read) { + System.out.println(metadata.toString()); + JSONObject knowledgeJson = new JSONObject(metadata, false, true); + System.out.println("fileName:" + knowledgeJson.getStr("name") + "." + knowledgeJson.getStr("format") + " ---- filePath:" + knowledgeJson.getStr("path") + "\n"); + } } } catch (NacosException e) { - logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); + e.printStackTrace(); } - } - @Test - public void parseSchemaGetFields() { - DocumentContext parse = JsonPath.parse(schema); - List<Object> fields = parse.read("$.fields[*]"); - for (Object field : fields) { - String name = JsonPath.read(field, "$.name").toString(); - String type = JsonPath.read(field, "$.type").toString(); - } } } diff --git a/src/test/java/com/zdjizhi/json/JsonTest.java b/src/test/java/com/zdjizhi/json/JsonTest.java deleted file mode 100644 index 597da40..0000000 --- a/src/test/java/com/zdjizhi/json/JsonTest.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.zdjizhi.json; - -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.json.JsonParseUtil; -import org.junit.Test; - -import java.util.Map; - -/** - * @author qidaijie - * @Package com.zdjizhi.json - * @Description: - * @date 2022/5/515:08 - */ -public class JsonTest { - - @Test - public void JacksonTest() { - String value = "{\"common_log_id\":null}"; - Map<String, Object> json = (Map<String, Object>) JsonMapper.fromJsonString(value, Map.class); - System.out.println(json.get("common_log_id")); - } -}
\ No newline at end of file diff --git a/src/test/java/com/zdjizhi/json/NewSchemaTest.java b/src/test/java/com/zdjizhi/json/NewSchemaTest.java new file mode 100644 index 0000000..18f14ff --- /dev/null +++ b/src/test/java/com/zdjizhi/json/NewSchemaTest.java @@ -0,0 +1,102 @@ +package com.zdjizhi.json; + +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONObject; +import com.alibaba.fastjson.JSON; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.exception.NacosException; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.StringUtil; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Properties; + +/** + * Applicable to schemas >= TSG22.08 + * + * @author qidaijie + * @Package com.zdjizhi.nacos + * @Description: + * @date 2022/3/1714:57 + */ +public class NewSchemaTest { + + private static Properties properties = new Properties(); + + + static { + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); + properties.setProperty(PropertyKeyConst.NAMESPACE, "test"); + properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); + properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); + } + + + @Test + public void newSchemaTest() { + try { + + ConfigService configService = NacosFactory.createConfigService(properties); + String dataId = "session_record.json"; + String group = "Galaxy"; + ArrayList<String[]> newJobList = getNewJobList(configService.getConfig(dataId, group, 5000)); + + for (String[] job : newJobList) { + System.out.println(Arrays.toString(job)); + } + } catch (NacosException e) { + e.printStackTrace(); + } + } + + + /** + * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) + * + * @return 任务列表 + */ + private static ArrayList<String[]> getNewJobList(String schema) { + ArrayList<String[]> list = new ArrayList<>(); + + JSONObject schemaJson = new JSONObject(schema, false, true); + JSONArray fields = schemaJson.getJSONArray("fields"); + for (Object field : fields) { + JSONObject fieldJson = new JSONObject(field, false, true); + boolean hasDoc = fieldJson.containsKey("doc"); + if (hasDoc) { + JSONObject docJson = fieldJson.getJSONObject("doc"); + boolean hasFormat = docJson.containsKey("format"); + if (hasFormat) { + String name = fieldJson.getStr("name"); + JSONArray formatList = docJson.getJSONArray("format"); + for (Object format : formatList) { + JSONObject formatJson = new JSONObject(format, false, true); + String function = formatJson.getStr("function"); + String appendTo; + String params = null; + + if (formatJson.containsKey("appendTo")) { + appendTo = formatJson.getStr("appendTo"); + } else { + appendTo = name; + } + + if (formatJson.containsKey("param")) { + params = formatJson.getStr("param"); + } + + list.add(new String[]{name, appendTo, function, params}); + + } + } + } + } + + return list; + } + +} diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListener.java b/src/test/java/com/zdjizhi/json/OldSchemaTest.java index 741b2a3..42c76db 100644 --- a/src/test/java/com/zdjizhi/nacos/SchemaListener.java +++ b/src/test/java/com/zdjizhi/json/OldSchemaTest.java @@ -1,4 +1,5 @@ -package com.zdjizhi.nacos; +package com.zdjizhi.json; + import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; @@ -6,7 +7,6 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.StringUtil; @@ -15,67 +15,51 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Properties; -import java.util.concurrent.Executor; /** + * Applicable to schemas < TSG22.08 * @author qidaijie * @Package com.zdjizhi.nacos * @Description: * @date 2022/3/1714:57 */ -public class SchemaListener { +public class OldSchemaTest { private static Properties properties = new Properties(); - private static ArrayList<String[]> jobList; static { - properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.40.43:8848"); - properties.setProperty(PropertyKeyConst.NAMESPACE, "test"); + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); + properties.setProperty(PropertyKeyConst.NAMESPACE, "prod"); properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); + } + + + @Test + public void oldSchemaTest() { try { ConfigService configService = NacosFactory.createConfigService(properties); String dataId = "session_record.json"; String group = "Galaxy"; - jobList = getJobListFromHttp(configService.getConfig(dataId, group, 5000)); - configService.addListener(dataId, group, new Listener() { - @Override - public Executor getExecutor() { - return null; - } + ArrayList<String[]> oldJobList = getOldJobList(configService.getConfig(dataId, group, 5000)); - @Override - public void receiveConfigInfo(String configMsg) { - jobList = getJobListFromHttp(configMsg); - } - }); + for (String[] job : oldJobList) { + System.out.println(Arrays.toString(job)); + } } catch (NacosException e) { e.printStackTrace(); } } - - @Test - public void dealCommonMessage() { - //keep running,change nacos config,print new config - while (true) { - try { - System.out.println(Arrays.toString(jobList.get(0))); - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - /** - * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) + * 解析schema,解析之后返回一个任务列表 (useList toList funcList paramlist) * + * @param schema 日志schema * @return 任务列表 */ - private static ArrayList<String[]> getJobListFromHttp(String schema) { + private static ArrayList<String[]> getOldJobList(String schema) { ArrayList<String[]> list = new ArrayList<>(); //获取fields,并转化为数组,数组的每个元素都是一个name doc type @@ -132,5 +116,4 @@ public class SchemaListener { } return list; } - } diff --git a/src/test/java/com/zdjizhi/nacos/NacosTest.java b/src/test/java/com/zdjizhi/nacos/NacosTest.java index 7745d5f..55dca3f 100644 --- a/src/test/java/com/zdjizhi/nacos/NacosTest.java +++ b/src/test/java/com/zdjizhi/nacos/NacosTest.java @@ -90,7 +90,7 @@ public class NacosTest { } //keep running,change nacos config,print new config - while (true) { + for (int i = 0; i < 3; i++) { try { Thread.sleep(5000); } catch (InterruptedException e) { diff --git a/src/test/java/com/zdjizhi/nacos/SchemaListenerTest.java b/src/test/java/com/zdjizhi/nacos/SchemaListenerTest.java new file mode 100644 index 0000000..72ae2e0 --- /dev/null +++ b/src/test/java/com/zdjizhi/nacos/SchemaListenerTest.java @@ -0,0 +1,116 @@ +package com.zdjizhi.nacos; + +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONObject; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * @author qidaijie + * @Package com.zdjizhi.nacos + * @Description: + * @date 2022/3/1714:57 + */ +public class SchemaListenerTest { + + private static Properties properties = new Properties(); + private static ArrayList<String[]> jobList; + + + static { + properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848"); + properties.setProperty(PropertyKeyConst.NAMESPACE, "test"); + properties.setProperty(PropertyKeyConst.USERNAME, "nacos"); + properties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); + + try { + ConfigService configService = NacosFactory.createConfigService(properties); + String dataId = "session_record.json"; + String group = "Galaxy"; + jobList = getJobListFromHttp(configService.getConfig(dataId, group, 5000)); + configService.addListener(dataId, group, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + jobList = getJobListFromHttp(configMsg); + } + }); + } catch (NacosException e) { + e.printStackTrace(); + } + } + + + @Test + public void dealCommonMessage() { + //keep running,change nacos config,print new config + for (int i = 0; i < 1; i++) { + try { + for (String[] job : jobList) { + System.out.println(Arrays.toString(job)); + } + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist) + * + * @return 任务列表 + */ + private static ArrayList<String[]> getJobListFromHttp(String schema) { + ArrayList<String[]> list = new ArrayList<>(); + + JSONObject schemaJson = new JSONObject(schema, false, true); + JSONArray fields = schemaJson.getJSONArray("fields"); + for (Object field : fields) { + JSONObject fieldJson = new JSONObject(field, false, true); + boolean hasDoc = fieldJson.containsKey("doc"); + if (hasDoc) { + JSONObject docJson = fieldJson.getJSONObject("doc"); + boolean hasFormat = docJson.containsKey("format"); + if (hasFormat) { + String name = fieldJson.getStr("name"); + JSONArray formatList = docJson.getJSONArray("format"); + for (Object format : formatList) { + JSONObject formatJson = new JSONObject(format, false, true); + String function = formatJson.getStr("function"); + String appendTo = null; + String params = null; + + if (formatJson.containsKey("appendTo")) { + appendTo = formatJson.getStr("appendTo"); + } else { + appendTo = name; + } + + if (formatJson.containsKey("param")) { + params = formatJson.getStr("param"); + } + + list.add(new String[]{name, appendTo, function, params}); + + } + } + } + } + + return list; + } +} |
