summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <[email protected]>2023-05-30 09:55:44 +0800
committerunknown <[email protected]>2023-05-30 09:55:44 +0800
commitbeecc5dcbcba96b2f29482ab94922ba34cc35a6a (patch)
tree806633efe9ce48984bb6f857ee8b5f5d9924eb74
parentce15a27a1bca76145e75884cadf17e34db180478 (diff)
TSG-15167 LTS版本新增知识库文件校验功能tsg-22.11
-rw-r--r--src/main/java/com/zdjizhi/source/HttpSource.java136
-rw-r--r--src/main/java/com/zdjizhi/source/SingleHttpSource.java185
2 files changed, 217 insertions, 104 deletions
diff --git a/src/main/java/com/zdjizhi/source/HttpSource.java b/src/main/java/com/zdjizhi/source/HttpSource.java
index 6451fc1..3080d25 100644
--- a/src/main/java/com/zdjizhi/source/HttpSource.java
+++ b/src/main/java/com/zdjizhi/source/HttpSource.java
@@ -2,6 +2,8 @@ package com.zdjizhi.source;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.io.file.FileReader;
+import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.json.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
@@ -28,10 +30,18 @@ import java.util.concurrent.Executor;
public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
-
private static final Logger logger = LoggerFactory.getLogger(HttpSource.class);
+ private static final int TRY_TIMES = 3;
+
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
+// private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_user_defined'])].['name','sha256','format','path']";
+
+ private static Map<String, String> knowledgeMetaCache = new HashMap<>();
+
+ private static HashMap<String, byte[]> knowledgeFileCache;
+
+ private static HttpClientUtils2 httpClientUtils;
//连接nacos的配置
private Properties nacosProperties;
@@ -50,11 +60,17 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
private ConfigService configService;
-// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
-// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
- private static Map<String, String> updateMap = new HashMap<>();
- private static HashMap<String, byte[]> knowledgeFileCache;
+ private static Header header;
+
+ //运行状态cancel时置为false
private boolean isRunning = true;
+ //是否下发,默认不发送
+ private boolean isSending = false;
+
+
+
+
+// private boolean isRunning = true;
public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) {
@@ -68,10 +84,12 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
+ httpClientUtils = new HttpClientUtils2();
//初始化元数据缓存
- updateMap = new HashMap<>(16);
+ knowledgeMetaCache = new HashMap<>(16);
//初始化定位库缓存
knowledgeFileCache = new HashMap<>(16);
+ header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
configService = NacosFactory.createConfigService(nacosProperties);
}
@@ -79,13 +97,12 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
public void run(SourceContext ctx) throws Exception {
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String format = formatter.format(new Date());
- logger.info(format + "receive config from nacos:" + config);
- System.out.println(format + "receive config from nacos:" + config);
if (StringUtil.isNotBlank(config)) {
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
loadKnowledge(metaList);
+ if (isSending){
+ ctx.collect(knowledgeFileCache);
+ }
}
@@ -108,13 +125,14 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
- if (!sha256.equals(updateMap.get(fileName))) {
- updateMap.put(fileName, sha256);
- updateKnowledge(fileName, filePath);
+ if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
+ knowledgeMetaCache.put(fileName, sha256);
+ updateKnowledge(fileName, filePath,sha256);
}
-
}
- ctx.collect(knowledgeFileCache);
+ if (isSending){
+ ctx.collect(knowledgeFileCache);
+ }
}
}
@@ -126,7 +144,11 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
});
while (isRunning) {
- Thread.sleep(10000);
+ try {
+ Thread.sleep(10000);
+ }catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
}
@@ -141,14 +163,22 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
- Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
- HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
- inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
- updateMap.put(fileName, sha256);
- knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
+ byte[] localFileByte = getLocalFile(fileName);
+ String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
+ if (sha256.equals(localFileSha256Hex)){
+ logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
+ knowledgeMetaCache.put(fileName, sha256);
+// knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
+ knowledgeFileCache.put(fileName, localFileByte);
+ }else {
+ logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
+ updateKnowledge(fileName,filePath,sha256);
+ }
+
}
}
- } catch (IOException ioException) {
+// } catch (IOException ioException) {
+ } catch (Exception ioException) {
ioException.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
@@ -156,23 +186,69 @@ public class HttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
}
- private void updateKnowledge(String fileName, String filePath) {
+ private void updateKnowledge(String fileName, String filePath,String sha256) {
InputStream inputStream = null;
- FileOutputStream outputStream = null;
+// FileOutputStream outputStream = null;
+ int retryNum = 0;
try {
- Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
- HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
- inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
- byte[] bytes = IOUtils.toByteArray(inputStream);
- HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, bytes);
- knowledgeFileCache.put(fileName, bytes);
+ while (retryNum < TRY_TIMES){
+ inputStream = httpClientUtils.httpGetInputStream(filePath, 90000, header);
+ if (inputStream !=null){
+ byte[] downloadBytes = IOUtils.toByteArray(inputStream);
+ if (downloadBytes.length>0){
+ String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
+ if (sha256.equals(downloadFileSha256Hex)){
+ logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
+// HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, downloadBytes);
+ knowledgeMetaCache.put(fileName,sha256);
+ knowledgeFileCache.put(fileName, downloadBytes);
+ updateLocalFile(fileName);
+ retryNum = TRY_TIMES;
+
+ isSending = true;
+ }else {
+ logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
+ retryNum++;
+ }
+ }
+ }
+ }
} catch (IOException ioException) {
ioException.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
+// IOUtils.closeQuietly(outputStream);
+ }
+ }
+
+
+ private void updateLocalFile(String fileName) {
+ FileOutputStream outputStream = null;
+ try {
+ HdfsUtils.uploadFileByBytes(CommonConfig.HDFS_PATH + fileName, knowledgeFileCache.get(fileName));
+ } catch (IOException ioe) {
+ logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
+ ioe.printStackTrace();
+ } catch (RuntimeException e) {
+ logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage());
+ e.printStackTrace();
+ } finally {
IOUtils.closeQuietly(outputStream);
}
+
}
+
+ private static byte[] getLocalFile(String name) {
+ byte[] fileBytes = null;
+ try {
+ fileBytes = HdfsUtils.getFileBytes(CommonConfig.HDFS_PATH + name) ;
+ } catch (RuntimeException | IOException e) {
+ logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage());
+ e.printStackTrace();
+ }
+ return fileBytes;
+ }
+
@Override
public void cancel() {
this.isRunning = false;
diff --git a/src/main/java/com/zdjizhi/source/SingleHttpSource.java b/src/main/java/com/zdjizhi/source/SingleHttpSource.java
index a72a82a..adfddb9 100644
--- a/src/main/java/com/zdjizhi/source/SingleHttpSource.java
+++ b/src/main/java/com/zdjizhi/source/SingleHttpSource.java
@@ -2,6 +2,8 @@ package com.zdjizhi.source;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.io.file.FileReader;
+import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.json.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
@@ -23,13 +25,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
+import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executor;
public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>> {
private static final Logger logger = LoggerFactory.getLogger(SingleHttpSource.class);
- private static HashMap<String, byte[]> knowledgeFileCache;
private Properties nacosProperties;
@@ -41,16 +43,23 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
private static String STORE_PATH;
+ private static HttpClientUtils2 httpClientUtils ;
+
private ConfigService configService;
-// private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
-// private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class);
+ private static Header header;
+
private static final String EXPR = "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined'])].['name','sha256','format','path']";
- private static Map<String, String> updateMap = new HashMap<>();
+ private static Map<String, String> knowledgeMetaCache = new HashMap<>();
+ private static HashMap<String, byte[]> knowledgeFileCache;
private boolean isRunning = true;
+ //是否下发,默认不发送
+ private boolean isSending = false;
+
+ private static final int TRY_TIMES = 3;
public SingleHttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT) {
@@ -65,33 +74,33 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
+ httpClientUtils = new HttpClientUtils2();
logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
configService = NacosFactory.createConfigService(nacosProperties);
//初始化元数据缓存
- updateMap = new HashMap<>(16);
+ knowledgeMetaCache = new HashMap<>(16);
//初始化定位库缓存
knowledgeFileCache = new HashMap<>(16);
+
+ header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
}
@Override
public void run(SourceContext ctx) throws Exception {
// ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
-// List<CustomFile> customFiles = new ArrayList<>();
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String format = formatter.format(new Date());
+ logger.info(format + "receive config from nacos:" + config);
+ System.out.println(format + "receive config from nacos:" + config);
if (StringUtil.isNotBlank(config)) {
ArrayList<Object> metaList = JsonPath.parse(config).read(EXPR);
loadKnowledge(metaList);
+ if (isSending){
+ ctx.collect(knowledgeFileCache);
+ }
}
-// if (StringUtil.isNotBlank(config)) {
-// List<KnowledgeLog> knowledgeLogListList = jsonMapperInstance.fromJson(config, listType);
-// if (knowledgeLogListList.size()>=1){
-// for (KnowledgeLog knowledgeLog : knowledgeLogListList) {
-// String name = knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat());
-// String sha256 = knowledgeLog.getSha256();
-// updateMap.put(name,sha256);
-// }
-// }
-// }
+
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@Override
@@ -112,13 +121,14 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
knowledgeJson.getStr("format"));
String sha256 = knowledgeJson.getStr("sha256");
String filePath = knowledgeJson.getStr("path");
- if (!sha256.equals(updateMap.get(fileName))) {
- updateMap.put(fileName, sha256);
- updateKnowledge(fileName, filePath);
+ if (!sha256.equals(knowledgeMetaCache.get(fileName))) {
+ knowledgeMetaCache.put(fileName, sha256);
+ updateKnowledge(fileName, filePath,sha256);
}
-
}
- ctx.collect(knowledgeFileCache);
+ if (isSending){
+ ctx.collect(knowledgeFileCache);
+ }
}
}
@@ -137,78 +147,105 @@ public class SingleHttpSource extends RichHttpSourceFunction<Map<String, byte[]>
}
-// private CustomFile loadKnowledge(String fileName, String filePath) {
+ private void loadKnowledge(ArrayList<Object> metaList) {
// InputStream inputStream = null;
-// FileOutputStream outputStream = null;
-// CustomFile customFile = new CustomFile();
-// try {
-// customFile.setFileName(fileName);
-// Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
-// HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
-// inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
-// FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
-// File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
-// outputStream = new FileOutputStream(file);
-// byte[] bytes = IOUtils.toByteArray(inputStream);
-// customFile.setContent(bytes);
-// inputStream = new ByteArrayInputStream(customFile.getContent());
-// IoUtil.copy(inputStream, outputStream);
-//
-// } catch (IOException ioException) {
-// ioException.printStackTrace();
-// } finally {
+ try {
+ if (metaList.size() >= 1) {
+ for (Object metadata : metaList) {
+ JSONObject knowledgeJson = new JSONObject(metadata, false, true);
+ String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
+ knowledgeJson.getStr("format"));
+ String sha256 = knowledgeJson.getStr("sha256");
+ String filePath = knowledgeJson.getStr("path");
+ byte[] localFileByte = getLocalFile(fileName);
+ String localFileSha256Hex = DigestUtil.sha256Hex(localFileByte);
+ if (sha256.equals(localFileSha256Hex)){
+ logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, localFileSha256Hex, sha256);
+ knowledgeMetaCache.put(fileName, sha256);
+ knowledgeFileCache.put(fileName, localFileByte);
+ }else {
+ logger.info("本地文件{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等,更新本地文件及缓存", fileName, localFileSha256Hex, sha256);
+ updateKnowledge(fileName,filePath,sha256);
+ }
+ }
+ }
+ } catch (RuntimeException exception) {
+ exception.printStackTrace();
+ }
+// finally {
// IOUtils.closeQuietly(inputStream);
-// IOUtils.closeQuietly(outputStream);
// }
-// return customFile;
-// }
-private void loadKnowledge(ArrayList<Object> metaList) {
- InputStream inputStream = null;
- try {
- if (metaList.size() >= 1) {
- for (Object metadata : metaList) {
- JSONObject knowledgeJson = new JSONObject(metadata, false, true);
- String fileName = Joiner.on(CommonConfig.LOCATION_SEPARATOR).useForNull("").join(knowledgeJson.getStr("name"),
- knowledgeJson.getStr("format"));
- String sha256 = knowledgeJson.getStr("sha256");
- String filePath = knowledgeJson.getStr("path");
- Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
- HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
+ }
+
+
+ private void updateKnowledge(String fileName, String filePath,String sha256) {
+ InputStream inputStream = null;
+// FileOutputStream outputStream = null;
+ int retryNum = 0;
+ try {
+ while (retryNum < TRY_TIMES){
inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
- updateMap.put(fileName, sha256);
- knowledgeFileCache.put(fileName, IOUtils.toByteArray(inputStream));
+ if (inputStream !=null){
+ byte[] downloadBytes = IOUtils.toByteArray(inputStream);
+ if (downloadBytes.length>0){
+ String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadBytes);
+ if (sha256.equals(downloadFileSha256Hex)){
+ logger.info("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256相等", fileName, sha256);
+ knowledgeMetaCache.put(fileName, sha256);
+ knowledgeFileCache.put(fileName, downloadBytes);
+ updateLocalFile(fileName);
+ retryNum = TRY_TIMES;
+ isSending = true;
+ }else {
+ logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256Hex, sha256, retryNum);
+ retryNum++;
+ }
+
+ }
+ }
}
+
+ } catch (IOException ioException) {
+ ioException.printStackTrace();
+ } finally {
+ IOUtils.closeQuietly(inputStream);
}
- } catch (IOException ioException) {
- ioException.printStackTrace();
- } finally {
- IOUtils.closeQuietly(inputStream);
}
-}
- private void updateKnowledge(String fileName, String filePath) {
- InputStream inputStream = null;
+
+ private void updateLocalFile(String fileName) {
+// InputStream inputStream = null;
FileOutputStream outputStream = null;
try {
- Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
- HttpClientUtils2 httpClientUtils = new HttpClientUtils2();
- inputStream = httpClientUtils.httpGetInputStream(filePath, 3000, header);
FileUtil.mkdir(CommonConfig.DOWNLOAD_PATH);
File file = new File(CommonConfig.DOWNLOAD_PATH.concat(File.separator).concat(fileName));
outputStream = new FileOutputStream(file);
- byte[] bytes = IOUtils.toByteArray(inputStream);
- knowledgeFileCache.put(fileName, bytes);
- inputStream=new ByteArrayInputStream(bytes);
- IoUtil.copy(inputStream, outputStream);
- } catch (IOException ioException) {
- ioException.printStackTrace();
+ IoUtil.copy(new ByteArrayInputStream(knowledgeFileCache.get(fileName)), outputStream);
+ } catch (IOException ioe) {
+ logger.error("更新本地文件{}时发生IO异常,异常信息为:", fileName, ioe.getMessage());
+ ioe.printStackTrace();
+ } catch (RuntimeException e) {
+ logger.error("更新本地文件{}时发生异常,异常信息为:", fileName, e.getMessage());
+ e.printStackTrace();
} finally {
- IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
+
}
+ private static byte[] getLocalFile(String name) {
+ byte[] fileBytes = null;
+ try {
+ fileBytes=new FileReader(CommonConfig.DOWNLOAD_PATH + name).readBytes();
+ } catch (RuntimeException e) {
+ logger.error("IpLookupUtils download MMDB files error, message is:" + e.getMessage());
+ e.printStackTrace();
+ }
+ return fileBytes;
+ }
+
+
@Override
public void cancel() {
this.isRunning = false;