diff options
| author | fy <2931945> | 2022-11-14 14:34:00 +0800 |
|---|---|---|
| committer | fy <2931945> | 2022-11-14 14:34:00 +0800 |
| commit | c58acdcfc94747b9e59c1125a016edc41aac5c15 (patch) | |
| tree | 59f2101bbef9bf7e71dce24d25c8a2827993b848 | |
| parent | 0a6f36393c5659f54c16514d9ce8ed6ff61e0239 (diff) | |
Flink连接知识库实现方案初始准备
| -rw-r--r-- | pom.xml | 24 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/CustomFile.java | 5 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java | 4 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/HdfsUtils.java | 56 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/HttpClientUtils2.java | 324 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/HttpSource.java | 139 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/IpLocationConfiguration.java | 21 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/KnowledgeBase.java | 32 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/KnowledgeConstant.java | 24 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/KnowledgeUtils.java | 95 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/LinkUtils.java | 75 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/NacosUtils.java | 1 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/NacosUtils2.java | 352 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/TestKnowledge.java | 160 | ||||
| -rw-r--r-- | src/main/resources/common.properties | 8 |
15 files changed, 1315 insertions, 5 deletions
@@ -12,6 +12,7 @@ <flink.version>1.13.1</flink.version> <hive.version>2.1.1</hive.version> <hadoop.version>2.7.1</hadoop.version> + <scala.binary.version>2.11</scala.binary.version> </properties> <repositories> @@ -210,7 +211,7 @@ <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> - <version>1.0.8</version> + <version>1.1.0</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> @@ -271,6 +272,27 @@ <artifactId>guava</artifactId> <version>22.0</version> </dependency> + + + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>1.18.2</version> + </dependency> + + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + </dependencies> diff --git a/src/main/java/com/zdjizhi/common/CustomFile.java b/src/main/java/com/zdjizhi/common/CustomFile.java new file mode 100644 index 0000000..0c22eb8 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/CustomFile.java @@ -0,0 +1,5 @@ +package com.zdjizhi.common; + + +public class CustomFile { +} diff --git a/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java b/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java index 575adb3..5f4a235 100644 --- a/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java +++ b/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java @@ -16,6 +16,10 @@ class TrafficServerIpMetricsSink { DataStream<DosMetricsLog> sideOutput = outputStream.getSideOutput(outputTag); sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME)) .setParallelism(CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM); + + + + } } diff --git a/src/main/java/com/zdjizhi/utils/HdfsUtils.java b/src/main/java/com/zdjizhi/utils/HdfsUtils.java new file mode 100644 index 0000000..e9484e7 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/HdfsUtils.java @@ -0,0 +1,56 @@ +package com.zdjizhi.utils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class HdfsUtils { + + + private static final Logger logger = LoggerFactory.getLogger(HdfsUtils.class); + + private static FileSystem fileSystem; + + static { + Configuration configuration = new Configuration(); + try { + //创建fileSystem,用于连接hdfs + fileSystem = FileSystem.get(configuration); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static byte[] getFileBytes(String filePath) throws IOException { + FSDataInputStream open = null; + try { + open = fileSystem.open(new Path(filePath)); + byte[] bytes = new byte[open.available()]; + open.read(0,bytes,0, open.available()); + return bytes; + } finally { + if (open != null) { + open.close(); + } + } + } + + public static void uploadFileByBytes(String filePath, byte[] bytes) throws IOException { + FSDataOutputStream fsDataOutputStream = null; + try { + fsDataOutputStream = fileSystem.create(new Path(filePath), true); + fsDataOutputStream.write(bytes); + } finally { + if (fsDataOutputStream != null) { + fsDataOutputStream.close(); + } + } + } + + +} diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java b/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java new file mode 100644 index 0000000..ddfd210 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/HttpClientUtils2.java @@ -0,0 +1,324 @@ +package com.zdjizhi.utils; + +import com.zdjizhi.common.CommonConfig; +import org.apache.commons.io.IOUtils; +import org.apache.http.*; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.ConnectionKeepAliveStrategy; +import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicHeaderElementIterator; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLHandshakeException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import static org.apache.kafka.common.requests.FetchMetadata.log; + +/** + * http client工具类 + * @author wlh + */ +public class HttpClientUtils2 { + /** 全局连接池对象 */ + private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager(); + + private static Logger logger = LoggerFactory.getLogger(HttpClientUtils2.class); + public static final String ERROR_MESSAGE = "-1"; + + /* + * 静态代码块配置连接池信息 + */ + static { + + // 设置最大连接数 + CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION); + // 设置每个连接的路由数 + CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE); + + } + + /** + * 获取Http客户端连接对象 + * @return Http客户端连接对象 + */ + private static CloseableHttpClient getHttpClient() { + // 创建Http请求配置参数 + RequestConfig requestConfig = RequestConfig.custom() + // 获取连接超时时间 + .setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT) + // 请求超时时间 + .setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT) + // 响应超时时间 + .setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT) + .build(); + + /* + * 测出超时重试机制为了防止超时不生效而设置 + * 如果直接放回false,不重试 + * 这里会根据情况进行判断是否重试 + */ + HttpRequestRetryHandler retry = (exception, executionCount, context) -> { + if (executionCount >= 3) {// 如果已经重试了3次,就放弃 + return false; + } + if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 + return true; + } + if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 + return false; + } + if (exception instanceof UnknownHostException) {// 目标服务器不可达 + return false; + } + if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 + return false; + } + if (exception instanceof HttpHostConnectException) {// 连接被拒绝 + return false; + } + if (exception instanceof SSLException) {// ssl握手异常 + return false; + } + if (exception instanceof InterruptedIOException) {// 超时 + return true; + } + HttpClientContext clientContext = HttpClientContext.adapt(context); + HttpRequest request = clientContext.getRequest(); + // 如果请求是幂等的,就再次尝试 + return !(request instanceof HttpEntityEnclosingRequest); + }; + + + ConnectionKeepAliveStrategy myStrategy = (response, context) -> { + HeaderElementIterator it = new BasicHeaderElementIterator + (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); + while (it.hasNext()) { + HeaderElement he = it.nextElement(); + String param = he.getName(); + String value = he.getValue(); + if (value != null && "timeout".equalsIgnoreCase(param)) { + return Long.parseLong(value) * 1000; + } + } + return 60 * 1000;//如果没有约定,则默认定义时长为60s + }; + + // 创建httpClient + return HttpClients.custom() + // 把请求相关的超时信息设置到连接客户端 + .setDefaultRequestConfig(requestConfig) + // 把请求重试设置到连接客户端 + .setRetryHandler(retry) + .setKeepAliveStrategy(myStrategy) + // 配置连接池管理对象 + .setConnectionManager(CONN_MANAGER) + .build(); + } + + + /** + * GET请求 + * + * @param uri 请求地 + * @return message + */ + public static String httpGet(URI uri, Header... headers) { + String msg = ERROR_MESSAGE; + + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + CloseableHttpResponse response = null; + + try { + logger.info("http get uri {}",uri); + // 创建GET请求对象 + HttpGet httpGet = new HttpGet(uri); + + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + logger.info("request header : {}",h); + } + } + // 执行请求 + response = httpClient.execute(httpGet); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + logger.error("Http get content is :{}" , msg); + } + + } catch (ClientProtocolException e) { + logger.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + logger.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + logger.error("IO错误: {}",e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consume(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("释放链接错误: {}", e.getMessage()); + + } + } + } + + return msg; + } + /** + * POST 请求 + * @param uri uri参数 + * @param requestBody 请求体 + * @return post请求返回结果 + */ + public static String httpPost(URI uri, String requestBody, Header... headers) { + String msg = ERROR_MESSAGE; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(); + + // 创建POST请求对象 + CloseableHttpResponse response = null; + try { + + logger.info("http post uri:{}, http post body:{}", uri, requestBody); + + HttpPost httpPost = new HttpPost(uri); + httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded"); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpPost.addHeader(h); + logger.info("request header : {}",h); + } + } + + if(StringUtil.isNotBlank(requestBody)) { + byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8); + httpPost.setEntity(new ByteArrayEntity(bytes)); + } + + response = httpClient.execute(httpPost); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + logger.error("Http post content is :{}" , msg); + } + } catch (ClientProtocolException e) { + logger.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + logger.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + logger.error("IO错误: {}", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("释放链接错误: {}", e.getMessage()); + + } + } + } + return msg; + } + + /** + * 拼装url + * url ,参数map + */ + public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map<String, Object> params) { + try { + uriBuilder.setPath(path); + if (params != null && !params.isEmpty()){ + for (Map.Entry<String, Object> kv : params.entrySet()) { + uriBuilder.setParameter(kv.getKey(),kv.getValue().toString()); + } + } + } catch (Exception e) { + logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params); + } + } + + + // TODO: 2022/10/19 加载知识库 + public InputStream httpGetInputStream(String url, int socketTimeout, Header... headers) { + InputStream result = null; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient();// TODO: 2022/10/19 去掉了 socketTimeout + // 创建GET请求对象 + HttpGet httpGet = new HttpGet(url); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + } + } + CloseableHttpResponse response = null; + + try { + // 执行请求 + response = httpClient.execute(httpGet); + // 获取响应实体 + result = IOUtils.toBufferedInputStream(response.getEntity().getContent()); + // 获取响应信息 + EntityUtils.consume(response.getEntity()); + } catch (ClientProtocolException e) { + log.error("current file: {},Protocol error:{}", url, e.getMessage()); + + } catch (ParseException e) { + log.error("current file: {}, Parser error:{}", url, e.getMessage()); + + } catch (IOException e) { + log.error("current file: {},IO error:{}", url, e.getMessage()); + + } finally { + if (null != response) { + try { + EntityUtils.consume(response.getEntity()); + response.close(); + } catch (IOException e) { + log.error("Release Connection error:{}", e.getMessage()); + + } + } + return result; + } + } + + + + + +} diff --git a/src/main/java/com/zdjizhi/utils/HttpSource.java b/src/main/java/com/zdjizhi/utils/HttpSource.java new file mode 100644 index 0000000..8891645 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/HttpSource.java @@ -0,0 +1,139 @@ +package com.zdjizhi.utils; + +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.fasterxml.jackson.databind.JavaType; +import com.zdjizhi.common.CommonConfig; +import com.zdjizhi.common.CustomFile; +import com.zdjizhi.common.KnowledgeLog; +//import com.zdjizhi.function.source.RichHttpSourceFunction; +import org.apache.commons.io.IOUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.http.Header; +import org.apache.http.message.BasicHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executor; + + +public class HttpSource extends RichHttpSourceFunction<List<CustomFile>> { + + private static final Logger logger = LoggerFactory.getLogger(HttpSource.class); + + //连接nacos的配置 + private Properties nacosProperties; + + //nacos data id + private String NACOS_DATA_ID; + + //nacos group + private String NACOS_GROUP; + + //nacos 连接超时时间 + private long NACOS_READ_TIMEOUT; + + //上传到hdfs的路径 + private String STORE_PATH; + + private ConfigService configService; + + private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); + private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); + + private boolean isRunning = true; + + + public HttpSource(Properties nacosProperties, String NACOS_DATA_ID, String NACOS_GROUP, long NACOS_READ_TIMEOUT, String storePath) { + this.nacosProperties = nacosProperties; + this.NACOS_DATA_ID = NACOS_DATA_ID; + this.NACOS_GROUP = NACOS_GROUP; + this.NACOS_READ_TIMEOUT = NACOS_READ_TIMEOUT; + this.STORE_PATH = storePath; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + logger.info("连接nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); + configService = NacosFactory.createConfigService(nacosProperties); + } + + @Override + public void run(SourceContext ctx) throws Exception { + String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); + logger.info("receive config from nacos:" + config); + System.out.println("receive config from nacos:" + config); + List<CustomFile> customFiles; + //从nacos拿到配置后连接hos下载知识库到内存 + customFiles = loadKnowledge(config); + //将知识库传递到下一节点,即广播 + ctx.collectWithTimestamp(customFiles,System.currentTimeMillis()); + + configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + try { + logger.info("receive update config:" + configMsg); + List<CustomFile> customFiles = new ArrayList<>(); + customFiles = loadKnowledge(configMsg); + ctx.collectWithTimestamp(customFiles,System.currentTimeMillis()); + //将更新后的文件重新上传至hdfs + for (CustomFile customFile : customFiles) { + logger.info("begin upload to hdfs:" + STORE_PATH + customFile.getFileName()); + HdfsUtils.uploadFileByBytes(STORE_PATH + customFile.getFileName(),customFile.getContent()); + logger.info(STORE_PATH + customFile.getFileName() + " upload finished"); + } + } catch (Exception e) { + logger.error("监听nacos配置失败", e); + } + System.out.println(configMsg); + } + }); + } + + //下载知识库到内存 + public List<CustomFile> loadKnowledge(String config) { + List<CustomFile> customFiles = new ArrayList<>(); + List<KnowledgeLog> knowledges = jsonMapperInstance.fromJson(config, listType); + for (KnowledgeLog knowledge : knowledges) { + CustomFile customFile = new CustomFile(); + String fileName = knowledge.getName().concat(".").concat(knowledge.getFormat()); + customFile.setFileName(fileName); + InputStream inputStream = null; + try { + Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); + HttpClientUtils httpClientUtils = new HttpClientUtils(); + inputStream = httpClientUtils.httpGetInputStream(knowledge.getPath(), 3000, header); + byte[] bytes = IOUtils.toByteArray(inputStream); + customFile.setContent(bytes); + } catch (IOException ioException) { + ioException.printStackTrace(); + } finally { + IOUtils.closeQuietly(inputStream); + } + customFiles.add(customFile); + } + return customFiles; + } + + @Override + public void cancel() { + this.isRunning = false; + } + + + +} diff --git a/src/main/java/com/zdjizhi/utils/IpLocationConfiguration.java b/src/main/java/com/zdjizhi/utils/IpLocationConfiguration.java new file mode 100644 index 0000000..1141400 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/IpLocationConfiguration.java @@ -0,0 +1,21 @@ +package com.zdjizhi.utils; +import lombok.Data; +/** + * @author fy + * @version 1.0 + * @date 2022/10/19 18:27 + */ + + +@Data +public class IpLocationConfiguration { + + private String ipV4UserDefined; + + private String ipV4BuiltIn; + + private String ipV6UserDefined; + + private String ipV6BuiltIn; + +} diff --git a/src/main/java/com/zdjizhi/utils/KnowledgeBase.java b/src/main/java/com/zdjizhi/utils/KnowledgeBase.java new file mode 100644 index 0000000..f4198f5 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/KnowledgeBase.java @@ -0,0 +1,32 @@ +package com.zdjizhi.utils; +import lombok.Data; +/** + * @author fy + * @version 1.0 + * @date 2022/10/19 18:27 + */ + + +@Data +public class KnowledgeBase { + + private String id; + + private String name; + + private String type; + + private String path; + + private Long size; + + private String format; + + private String sha256; + + private String origin_url; + + private String version; + + private String updateTime; +} diff --git a/src/main/java/com/zdjizhi/utils/KnowledgeConstant.java b/src/main/java/com/zdjizhi/utils/KnowledgeConstant.java new file mode 100644 index 0000000..c674bb6 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/KnowledgeConstant.java @@ -0,0 +1,24 @@ +package com.zdjizhi.utils; + +/** + * @author fy + * @version 1.0 + * @date 2022/10/19 18:25 + * 提取了下知识库的名称 + */ +public class KnowledgeConstant { + + public static final String DAT = "dat"; + public static final String IP_USER_DEFINED_V4 = "ip_v4_user_defined"; + public static final String IP_USER_DEFINED_V6 = "ip_v6_user_defined"; + public static final String IP_BUILT_IN_V4 = "ip_v4_built_in"; + public static final String IP_BUILT_IN_V6 = "ip_v6_built_in"; + public static final String IP_USER_DEFINED_V4_MMDB = "ip_v4_user_defined.mmdb"; + public static final String IP_USER_DEFINED_V6_MMDB = "ip_v6_user_defined.mmdb"; + public static final String IP_BUILT_IN_V4_MMDB = "ip_v4_built_in.mmdb"; + public static final String IP_BUILT_IN_V6_MMDB = "ip_v6_built_in.mmdb"; + public static final String LATEST = "latest"; + public static final String TOKEN = "token"; + +} + diff --git a/src/main/java/com/zdjizhi/utils/KnowledgeUtils.java b/src/main/java/com/zdjizhi/utils/KnowledgeUtils.java new file mode 100644 index 0000000..b9ec7e0 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/KnowledgeUtils.java @@ -0,0 +1,95 @@ +package com.zdjizhi.utils; + +import cn.hutool.core.io.IoUtil; +import cn.hutool.http.Header; +import com.fasterxml.jackson.databind.JavaType; +import org.apache.http.message.BasicHeader; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.requests.FetchMetadata.log; + +/** + * @author fy + * @version 1.0 + * @date 2022/10/19 14:07 + */ +public class KnowledgeUtils { + + private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); + private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeBase.class); + + + + + public Map<String, KnowledgeBase> queryLoadKnowledgeInfo(String knowledgeMetadata, IpLocationConfiguration ipLocationConfiguration) { + + log.info("update knowledge Base version:{},update knowledge Base content:{}", ipLocationConfiguration, knowledgeMetadata); + Map<String, KnowledgeBase> map = new HashMap<>(); + + if (StringUtil.isNotEmpty(ipLocationConfiguration) && StringUtil.isNotEmpty(knowledgeMetadata)) { +// List<KnowledgeBase> knowledgeBaseList = JsonMapper.fromJsonString(knowledgeMetadata, List.class, KnowledgeBase.class); + + List<KnowledgeBase> knowledgeBaseList = jsonMapperInstance.fromJson(knowledgeMetadata, listType);// TODO: 2022/10/21 + + String ipV4BuiltIn = ipLocationConfiguration.getIpV4BuiltIn(); + String ipV4UserDefined = ipLocationConfiguration.getIpV4UserDefined(); + String ipV6BuiltIn = ipLocationConfiguration.getIpV6BuiltIn(); + String ipV6UserDefined = ipLocationConfiguration.getIpV6UserDefined(); + + for (KnowledgeBase knowledgeBase : knowledgeBaseList) { + String name = knowledgeBase.getName(); + String version = knowledgeBase.getVersion(); + String concat = name.concat(":").concat(version); + + if (StringUtil.equals(concat, ipV4BuiltIn) + || (StringUtil.equals(name, ipV4BuiltIn) && StringUtil.equals(version, KnowledgeConstant.LATEST))) { + map.put(KnowledgeConstant.IP_BUILT_IN_V4, knowledgeBase); + } else if (StringUtil.equals(concat, ipV4UserDefined) + || (StringUtil.equals(name, ipV4UserDefined) && StringUtil.equals(version, KnowledgeConstant.LATEST))) { + map.put(KnowledgeConstant.IP_USER_DEFINED_V4, knowledgeBase); + } else if (StringUtil.equals(concat, ipV6BuiltIn) + || (StringUtil.equals(name, ipV6BuiltIn) && StringUtil.equals(version, KnowledgeConstant.LATEST))) { + map.put(KnowledgeConstant.IP_BUILT_IN_V6, knowledgeBase); + } else if (StringUtil.equals(concat, ipV6UserDefined) + || (StringUtil.equals(name, ipV6UserDefined) && StringUtil.equals(version, KnowledgeConstant.LATEST))) { + map.put(KnowledgeConstant.IP_USER_DEFINED_V6, knowledgeBase); + } + } + } + return map; + } + + + private void download(KnowledgeBase knowledgeBase) { + + String id = knowledgeBase.getId(); + String name = knowledgeBase.getName(); + String sha256 = knowledgeBase.getSha256(); + String format = knowledgeBase.getFormat(); + FileOutputStream outputStream = null; + InputStream inputStream = null; + try { + Header header = new BasicHeader(KnowledgeConstant.TOKEN, hosConfig.getToken()); + inputStream = httpClientService.httpGetInputStream(knowledgeBase.getPath(), httpConfig.getServerResponseTimeOut(), header); + File file = new File(KnowledgeConstant.DAT.concat(File.separator).concat(name).concat(".").concat(format)); + outputStream = new FileOutputStream(file); + IoUtil.copy(inputStream, outputStream); + log.info("knowledge download name :{},version:{}", name, knowledgeBase.getVersion()); + } catch (IOException e) { + log.error("file not fount,:{}", e); + } finally { + IoUtil.close(inputStream); + IoUtil.close(outputStream); + } + updateMap.put(id, sha256); + } + + +} diff --git a/src/main/java/com/zdjizhi/utils/LinkUtils.java b/src/main/java/com/zdjizhi/utils/LinkUtils.java new file mode 100644 index 0000000..01510be --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/LinkUtils.java @@ -0,0 +1,75 @@ +package com.zdjizhi.etl.utils; + +import com.opencsv.CSVParser; +import com.opencsv.CSVReader; +import com.zdjizhi.base.common.CnRecordLog; +import com.zdjizhi.base.common.CommonConfig; +import com.zdjizhi.etl.common.LinkAndIspInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Set; + +/** + * @ Author pengfeixu + * @ Date 2022/3/30 + * @ Description + */ +public class LinkUtils { + private static Logger LOG = LoggerFactory.getLogger(LinkUtils.class); + public static HashMap<Long, LinkAndIspInfo> linkmap = new HashMap<>(); +// private static NacosUtils nacosUtils = new NacosUtils(); + // private static HashMap<Long, LinkAndIspInfo> linkmap = readLinkCsv(CommonConfig.LINK_PATH); + + public static void readLinkCsv(String path) { + HashMap<Long, LinkAndIspInfo> linkDirectionmap = new HashMap<>(); + + try { + DataInputStream in = new DataInputStream(new FileInputStream(new File(path))); + CSVReader csvReader = new CSVReader(new InputStreamReader(in, StandardCharsets.UTF_8), CSVParser.DEFAULT_SEPARATOR, + CSVParser.DEFAULT_QUOTE_CHARACTER, CSVParser.DEFAULT_ESCAPE_CHARACTER, 1); + + for (String[] strs : csvReader) { + long key=Long.parseLong(strs[6]); + LinkAndIspInfo linkDirectionInfo = new LinkAndIspInfo(); + linkDirectionInfo.setEgress_link_direction(strs[3]); + linkDirectionInfo.setIngress_link_direction(strs[3]); + linkDirectionInfo.setLink_id(Integer.parseInt(strs[6])); + linkmap.put(key, linkDirectionInfo); + } + csvReader.close(); + } catch (Exception e) { + LOG.error(e.getMessage()); + } +// return linkDirectionmap; + } + + public static void setLinkMessage(CnRecordLog cnRecordLog) { + if (! linkmap.containsKey(cnRecordLog.getCommon_egress_link_id()) || ! linkmap.containsKey(cnRecordLog.getCommon_ingress_link_id())){ + cnRecordLog.setCommon_egress_link_id(0); + cnRecordLog.setCommon_ingress_link_id(0); + } + + if (linkmap.containsKey(cnRecordLog.getCommon_egress_link_id())){ + cnRecordLog.setEgress_link_direction(linkmap.get(cnRecordLog.getCommon_egress_link_id()).getEgress_link_direction()); + } + if (linkmap.containsKey(cnRecordLog.getCommon_ingress_link_id())){ + cnRecordLog.setIngress_link_direction(linkmap.get(cnRecordLog.getCommon_ingress_link_id()).getIngress_link_direction()); + } + } + + public static void main(String[] args) { +// HashMap<Long, LinkAndIspInfo> map = readLinkCsv(CommonConfig.LINK_PATH); + Set<Long> keySet = linkmap.keySet(); + for (long key : keySet){ + System.out.println(linkmap.get(key).toString()); + } + System.out.println(linkmap.size()); + } +} diff --git a/src/main/java/com/zdjizhi/utils/NacosUtils.java b/src/main/java/com/zdjizhi/utils/NacosUtils.java index cdefd95..6adfc23 100644 --- a/src/main/java/com/zdjizhi/utils/NacosUtils.java +++ b/src/main/java/com/zdjizhi/utils/NacosUtils.java @@ -44,6 +44,7 @@ public class NacosUtils { String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); commonProperties.load(new StringReader(config)); + configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { @Override public Executor getExecutor() { diff --git a/src/main/java/com/zdjizhi/utils/NacosUtils2.java b/src/main/java/com/zdjizhi/utils/NacosUtils2.java new file mode 100644 index 0000000..89add4b --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/NacosUtils2.java @@ -0,0 +1,352 @@ +package com.zdjizhi.utils; + +import cn.hutool.core.io.IoUtil; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.listener.Listener; +import com.fasterxml.jackson.databind.JavaType; +//import com.zdjizhi.base.common.CommonConfig; +//import com.zdjizhi.base.common.KnowledgeConstant; +//import com.zdjizhi.base.common.KnowledgeLog; +//import com.zdjizhi.base.utils.CommonConfigurations; +//import com.zdjizhi.base.utils.HttpClientUtils; +import com.zdjizhi.utils.*; +import org.apache.http.Header; +import org.apache.http.message.BasicHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; +import java.util.concurrent.Executor; + + +public class NacosUtils2 { + + + private static final Logger logger = LoggerFactory.getLogger(NacosUtils2.class); + private static CommonConfigurations configurations; + + + + + private static Properties nacosProperties = new Properties(); + private static Properties commonProperties = new Properties(); + + private static Map<String, String> updateMap = new HashMap<>();//存文件路径(放在服务器了) + + private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); + private static JavaType listType = jsonMapperInstance.createCollectionType(List.class, KnowledgeLog.class); + + private static final String NACOS_SERVER_ADDR = configurations.getStringProperty("nacos.server.addr"); + private static final String NACOS_NAMESPACE = configurations.getStringProperty("nacos.namespace"); + private static final String NACOS_USERNAME = configurations.getStringProperty("nacos.username"); + private static final String NACOS_PASSWORD = configurations.getStringProperty("nacos.password"); + private static final String NACOS_DATA_ID = configurations.getStringProperty("nacos.data.id"); + private static final String NACOS_GROUP = configurations.getStringProperty("nacos.group"); + private static final long NACOS_READ_TIMEOUT = configurations.getLongProperty("nacos.read.timeout"); + + static { + Properties propService; + try { + propService = new Properties(); + propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties")); + configurations = new CommonConfigurations(propService); + } catch (Exception e) { + logger.error("加载common.properties配置文件失败"); + System.exit(1); + } + } + + + + static { + createConfigService(); + } + + private static void getProperties() { + nacosProperties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_SERVER_ADDR); + nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, NACOS_NAMESPACE); + nacosProperties.setProperty(PropertyKeyConst.USERNAME, NACOS_USERNAME); + nacosProperties.setProperty(PropertyKeyConst.PASSWORD, NACOS_PASSWORD); + } + + public static void createConfigService() { + try { + getProperties(); + ConfigService configService = NacosFactory.createConfigService(nacosProperties); + String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT); +// commonProperties.load(new StringReader(config)); + +// Map<String, String> pathMap = + loadKnowledge(config);; + + configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String configMsg) { + try { +// commonProperties.clear(); +// commonProperties.load(new StringReader(configMsg)); + loadKnowledge(configMsg); +// IdcRenterUtils.readServerCsv(configMsg); + } catch (Exception e) { + logger.error("监听nacos配置失败", e); + } + System.out.println(configMsg); + } + }); + } catch (Exception e) { + e.printStackTrace(); + logger.error("获取nacos配置失败", e); + } + + } + + + //加载nacos配置 + public static void loadKnowledge(String config){ + Map<String, KnowledgeLog> knowledgeLogMap = queryLoadKnowledgeInfo(config); + + + for (Map.Entry<String, KnowledgeLog> knowledgeLogEntry : knowledgeLogMap.entrySet()) { + KnowledgeLog knowledgeLog = knowledgeLogEntry.getValue(); + String id = knowledgeLog.getId(); + String sha256 = knowledgeLog.getSha256(); + if (!StringUtil.equals(sha256,updateMap.get(id))){ + System.out.println(knowledgeLogEntry.getValue()); + download( knowledgeLogEntry.getValue());//下载文件 + } + } + Map<String, String> path = queryPath(knowledgeLogMap); + + //初始化ip定位库 + IpUtils.initIpLookup(path.get(KnowledgeConstant.IP_BUILT_IN_V4_MMDB),path.get(KnowledgeConstant.ASN_V4),path.get(KnowledgeConstant.ASN_V6)); + IdcRenterUtils.readServerCsv(path.get(KnowledgeConstant.ISP)); + LinkUtils.readLinkCsv(path.get(KnowledgeConstant.LINK)); + IspUtils.readServerCsv(path.get(KnowledgeConstant.ISP)); + FcUtils.readCsv(path.get(KnowledgeConstant.WEBSKT)); + FcUtils.readIcpCsv(path.get(KnowledgeConstant.ICP)); + WhoisUtils.readJson(path.get(KnowledgeConstant.WHOIS)); + DnsServerUtils.readServerCsv(path.get(KnowledgeConstant.DNSPATH)); + AppUtils.readJson(path.get(KnowledgeConstant.APPSKT)); + IPUtils.readInternalIpCsv(path.get(KnowledgeConstant.INTERNALIP)); + + } + + //query知识库中json信息 + public static Map<String,KnowledgeLog> queryLoadKnowledgeInfo(String content){ +// KnowledgeLog knowledgeLog = new KnowledgeLog(); + Map<String, KnowledgeLog> map = new HashMap<>(); + +// ArrayList<HashMap<String, Object>> o = jsonMapperInstance.fromJson(content, listType); + List<KnowledgeLog> o = jsonMapperInstance.fromJson(content, listType); + for (KnowledgeLog knowledgeLog : o) { + +// } +// for (HashMap<String, Object> obj : o) { +// knowledgeLog.setId(obj.get("id").toString()); +// knowledgeLog.setName(obj.get("name").toString()); +// knowledgeLog.setPath(obj.get("path").toString()); +// knowledgeLog.setSize(Long.parseLong(obj.get("size").toString())); +// knowledgeLog.setFormat(obj.get("format").toString()); +// knowledgeLog.setSha256(obj.get("sha256").toString()); +// knowledgeLog.setVersion(obj.get("version").toString()); +// knowledgeLog.setUpdateTime(obj.get("updateTime").toString()); + + if ((KnowledgeConstant.IP_BUILT_IN_V4_MMDB.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))) && (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ + map.put(KnowledgeConstant.IP_BUILT_IN_V4_MMDB,knowledgeLog); + } + if (((KnowledgeConstant.ASN_V4.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))) )&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ + map.put(KnowledgeConstant.ASN_V4,knowledgeLog); + } + if ((KnowledgeConstant.ASN_V6.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion())) ){ + map.put(KnowledgeConstant.ASN_V6,knowledgeLog); + } + if ((KnowledgeConstant.WEBSKT.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion())) )){ + map.put(KnowledgeConstant.WEBSKT,knowledgeLog); + } + if ((KnowledgeConstant.DNSPATH.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ + map.put(KnowledgeConstant.DNSPATH,knowledgeLog); + } + if ((KnowledgeConstant.APPSKT.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ + map.put(KnowledgeConstant.APPSKT,knowledgeLog); + } + if ((KnowledgeConstant.WHOIS.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ + map.put(KnowledgeConstant.WHOIS,knowledgeLog); + } + if( (KnowledgeConstant.ICP.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ + map.put(KnowledgeConstant.ICP,knowledgeLog); + } + if ((KnowledgeConstant.IOC.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ + map.put(KnowledgeConstant.IOC,knowledgeLog); + } + if ((KnowledgeConstant.LINK.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat())))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ + map.put(KnowledgeConstant.LINK,knowledgeLog); + } + if (KnowledgeConstant.ISP.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ + map.put(KnowledgeConstant.ISP,knowledgeLog); + } + if (KnowledgeConstant.IDCRENTER.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ + map.put(KnowledgeConstant.IDCRENTER,knowledgeLog); + } + if (KnowledgeConstant.INTERNALIP.equals(knowledgeLog.getName().concat(".").concat(knowledgeLog.getFormat()))&& (KnowledgeConstant.LATEST.equals(knowledgeLog.getVersion()))){ + map.put(KnowledgeConstant.INTERNALIP,knowledgeLog); + } +// } +// download(knowledgeLog); + } + return map; + } + + + + + + //匹配路径,文件存放在服务器的路径 + private static Map<String,String> queryPath(Map<String,KnowledgeLog> map){ + Map<String,String> pathMap=new HashMap<>(); + KnowledgeLog ip_v4 = map.get(KnowledgeConstant.IP_BUILT_IN_V4_MMDB);//获取值 + KnowledgeLog asn_v4 = map.get(KnowledgeConstant.ASN_V4); + KnowledgeLog asn_v6 = map.get(KnowledgeConstant.ASN_V6); + KnowledgeLog webskt = map.get(KnowledgeConstant.WEBSKT); + KnowledgeLog dnsPath = map.get(KnowledgeConstant.DNSPATH); + KnowledgeLog appskt = map.get(KnowledgeConstant.APPSKT); + KnowledgeLog whois = map.get(KnowledgeConstant.WHOIS); + KnowledgeLog icp = map.get(KnowledgeConstant.ICP); + KnowledgeLog ioc = map.get(KnowledgeConstant.IOC); + KnowledgeLog link = map.get(KnowledgeConstant.LINK); + KnowledgeLog isp = map.get(KnowledgeConstant.ISP); + KnowledgeLog idcRenter = map.get(KnowledgeConstant.IDCRENTER); + KnowledgeLog internalIp = map.get(KnowledgeConstant.INTERNALIP); + + if (StringUtil.isEmpty(ip_v4)|| !new File(CommonConfig.IP_PATH.concat(ip_v4.getName().concat(".").concat(ip_v4.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.IP_BUILT_IN_V4_MMDB,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.IP_BUILT_IN_V4_MMDB); + }else{ + pathMap.put(KnowledgeConstant.IP_BUILT_IN_V4_MMDB,CommonConfig.IP_PATH.concat(File.separator).concat(ip_v4.getName()).concat(".").concat(ip_v4.getFormat())); + } + if (StringUtil.isEmpty(asn_v4)|| !new File(CommonConfig.IP_PATH.concat(asn_v4.getName().concat(".").concat(asn_v4.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.ASN_V4,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.ASN_V4); + }else{ + pathMap.put(KnowledgeConstant.ASN_V4,CommonConfig.IP_PATH.concat(File.separator).concat(asn_v4.getName()).concat(".").concat(asn_v4.getFormat())); + } + if (StringUtil.isEmpty(asn_v6)|| !new File(CommonConfig.IP_PATH.concat(asn_v6.getName().concat(".").concat(asn_v6.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.ASN_V6,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.ASN_V6); + }else{ + pathMap.put(KnowledgeConstant.ASN_V6,CommonConfig.IP_PATH.concat(File.separator).concat(asn_v6.getName()).concat(".").concat(asn_v6.getFormat())); + } + if (StringUtil.isEmpty(webskt)|| !new File(CommonConfig.IP_PATH.concat(webskt.getName().concat(".").concat(webskt.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.WEBSKT,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.WEBSKT); + }else{ + pathMap.put(KnowledgeConstant.WEBSKT,CommonConfig.IP_PATH.concat(File.separator).concat(webskt.getName()).concat(".").concat(webskt.getFormat())); + } + if (StringUtil.isEmpty(dnsPath)|| !new File(CommonConfig.IP_PATH.concat(dnsPath.getName().concat(".").concat(dnsPath.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.DNSPATH,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.DNSPATH); + }else{ + pathMap.put(KnowledgeConstant.DNSPATH,CommonConfig.IP_PATH.concat(File.separator).concat(dnsPath.getName()).concat(".").concat(dnsPath.getFormat())); + } + if (StringUtil.isEmpty(appskt)|| !new File(CommonConfig.IP_PATH.concat(appskt.getName().concat(".").concat(appskt.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.APPSKT,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.APPSKT); + }else{ + pathMap.put(KnowledgeConstant.APPSKT,CommonConfig.IP_PATH.concat(File.separator).concat(appskt.getName()).concat(".").concat(appskt.getFormat())); + } + if (StringUtil.isEmpty(whois)|| !new File(CommonConfig.IP_PATH.concat(whois.getName().concat(".").concat(whois.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.WHOIS,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.WHOIS); + }else{ + pathMap.put(KnowledgeConstant.WHOIS,CommonConfig.IP_PATH.concat(File.separator).concat(whois.getName()).concat(".").concat(whois.getFormat())); + } + if (StringUtil.isEmpty(icp)|| !new File(CommonConfig.IP_PATH.concat(icp.getName().concat(".").concat(icp.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.ICP,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.ICP); + }else{ + pathMap.put(KnowledgeConstant.ICP,CommonConfig.IP_PATH.concat(File.separator).concat(icp.getName()).concat(".").concat(icp.getFormat())); + } + if (StringUtil.isEmpty(ioc)|| !new File(CommonConfig.IP_PATH.concat(ioc.getName().concat(".").concat(ioc.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.IOC,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.IOC); + }else{ + pathMap.put(KnowledgeConstant.IOC,CommonConfig.IP_PATH.concat(File.separator).concat(ioc.getName()).concat(".").concat(ioc.getFormat())); + } + if (StringUtil.isEmpty(link)|| !new File(CommonConfig.IP_PATH.concat(link.getName().concat(".").concat(link.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.LINK,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.LINK); + }else{ + pathMap.put(KnowledgeConstant.LINK,CommonConfig.IP_PATH.concat(File.separator).concat(link.getName()).concat(".").concat(link.getFormat())); + } + if (StringUtil.isEmpty(isp)|| !new File(CommonConfig.IP_PATH.concat(isp.getName().concat(".").concat(isp.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.ISP,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.ISP); + }else{ + pathMap.put(KnowledgeConstant.ISP,CommonConfig.IP_PATH.concat(File.separator).concat(isp.getName()).concat(".").concat(isp.getFormat())); + } + if (StringUtil.isEmpty(idcRenter)|| !new File(CommonConfig.IP_PATH.concat(idcRenter.getName().concat(".").concat(idcRenter.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.IDCRENTER,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.IDCRENTER); + }else{ + pathMap.put(KnowledgeConstant.IDCRENTER,CommonConfig.IP_PATH.concat(File.separator).concat(idcRenter.getName()).concat(".").concat(idcRenter.getFormat())); + } + if (StringUtil.isEmpty(internalIp)|| !new File(CommonConfig.IP_PATH.concat(internalIp.getName().concat(".").concat(internalIp.getFormat()))).exists()){ + pathMap.put(KnowledgeConstant.INTERNALIP,CommonConfig.IP_PATH+File.separator+KnowledgeConstant.INTERNALIP); + }else{ + pathMap.put(KnowledgeConstant.INTERNALIP,CommonConfig.IP_PATH.concat(File.separator).concat(internalIp.getName()).concat(".").concat(internalIp.getFormat())); + } + return pathMap; + } + + + //下载文件 + public static void download(KnowledgeLog knowledgeLog){ + String id = knowledgeLog.getId(); + String name = knowledgeLog.getName(); + String sha256 = knowledgeLog.getSha256(); + String format = knowledgeLog.getFormat(); +// String path=null; +// Map<String,String> map=new HashMap<>(); + InputStream inputStream = null; + FileOutputStream outputStream = null; + + try { + Header header = new BasicHeader("token", CommonConfig.TOKEN); + HttpClientUtils httpClientUtils = new HttpClientUtils(); + inputStream = httpClientUtils.httpGetInputStream(knowledgeLog.getPath(), 3000, header); + File file=new File(CommonConfig.IP_PATH.concat(File.separator).concat(knowledgeLog.getName()).concat(".").concat(knowledgeLog.getFormat())); + outputStream=new FileOutputStream(file); + IoUtil.copy(inputStream, outputStream); + } catch (FileNotFoundException e) { + e.printStackTrace(); + }finally { + IoUtil.close(inputStream); + IoUtil.close(outputStream); + } + updateMap.put(id,sha256); + } + + public static String getStringProperty(String key) { + return commonProperties.getProperty(key); + } + + public static Integer getIntProperty(String key) { + return Integer.parseInt(commonProperties.getProperty(key)); + } + + public static Double getDoubleProperty(String key) { + return Double.parseDouble(commonProperties.getProperty(key)); + } + + public static Long getLongProperty(String key) { + return Long.parseLong(commonProperties.getProperty(key)); + } + + public static Boolean getBooleanProperty(String key) { + return "true".equals(commonProperties.getProperty(key).toLowerCase().trim()); + } + + + + + + + + + +} diff --git a/src/main/java/com/zdjizhi/utils/TestKnowledge.java b/src/main/java/com/zdjizhi/utils/TestKnowledge.java new file mode 100644 index 0000000..0456be6 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/TestKnowledge.java @@ -0,0 +1,160 @@ +package com.zdjizhi.utils; + + +import com.alibaba.nacos.api.PropertyKeyConst; +import com.fasterxml.jackson.databind.JsonNode; +import com.maxmind.db.CHMCache; +import com.zdjizhi.common.CustomFile; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.Reader; +import java.net.InetAddress; +import java.util.*; + +public class TestKnowledge { + + @Test + public void jobTest() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + //连接nacos配置 + Properties nacosProperties = new Properties(); + //hdfs存储路径 + String STORE_PATH = "/test/"; + nacosProperties.put(PropertyKeyConst.SERVER_ADDR,"192.168.44.12"); + nacosProperties.setProperty(PropertyKeyConst.USERNAME, "nacos"); + nacosProperties.setProperty(PropertyKeyConst.PASSWORD, "nacos"); + //获取知识库的流 + DataStreamSource<List<CustomFile>> customFileDataStreamSource = env.addSource(new HttpSource(nacosProperties, "knowledge_base.json", "DEFAULT_GROUP", 30000,STORE_PATH)); + //将该流设置并行度为1,多了会产生并发拉数据情况 + customFileDataStreamSource.setParallelism(1); + + //用于存储广播数据的state + MapStateDescriptor<String,List<CustomFile>> descriptor = + new MapStateDescriptor<String, List<CustomFile>>("descriptorTest", Types.STRING, Types.LIST(TypeInformation.of(CustomFile.class))); + //将该流广播 + BroadcastStream<List<CustomFile>> broadcast = customFileDataStreamSource.broadcast(descriptor); + + //将数据流和广播流connect + BroadcastConnectedStream<String, List<CustomFile>> connect = env.addSource(new MySource()).setParallelism(1).connect(broadcast); + + connect.process(new BroadcastProcessFunction<String, List<CustomFile>, Tuple2<String,String>>() { + Reader reader = null; + + //初始化,从hdfs拉知识库 + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + System.out.println("begin init"); + byte[] fileBytes = HdfsUtils.getFileBytes(STORE_PATH + "ip_v4.mmdb"); + InputStream inputStream = new ByteArrayInputStream(fileBytes); + this.reader = new Reader(inputStream,new CHMCache()); + System.out.println("init over"); + } + + //数据流处理逻辑 + @Override + public void processElement(String value, BroadcastProcessFunction<String, List<CustomFile>, Tuple2<String, String>>.ReadOnlyContext ctx, Collector<Tuple2<String, String>> out) throws Exception { + InetAddress ipAddress = InetAddress.getByName(value); + String result = "ip not find"; + JsonNode jsonNode = reader.get(ipAddress); + if (jsonNode != null) { + result = jsonNode.toString(); + } + out.collect(Tuple2.of(value,result)); + } + + //广播流处理逻辑,即知识库更新逻辑 + @Override + public void processBroadcastElement(List<CustomFile> value, BroadcastProcessFunction<String, List<CustomFile>, Tuple2<String, String>>.Context ctx, Collector<Tuple2<String, String>> out) throws Exception { + BroadcastState<String, List<CustomFile>> broadcastState = ctx.getBroadcastState(descriptor); + broadcastState.clear(); + broadcastState.put("test",value); + InputStream inputStream = null; + for (CustomFile customFile : value) { + if ("ip_v4_built_in.mmdb".equals(customFile.getFileName())) { + inputStream = new ByteArrayInputStream(customFile.getContent()); + } + } + this.reader = new Reader(inputStream, new CHMCache()); + } + }).setParallelism(1).addSink(new CollectSink1()); + + env.execute(); + + for (Tuple2<String, String> value : CollectSink1.values) { + System.out.println(value); + } + System.out.println(CollectSink1.values); + } + + //自定义sink,用于test + public static class CollectSink implements SinkFunction<CustomFile> { + + public static final List<CustomFile> values = Collections.synchronizedList(new ArrayList<CustomFile>()); + + public void invoke(CustomFile value, Context context) throws Exception { + values.add(value); + } + } + + //自定义sink,用于test + public static class CollectSink1 implements SinkFunction<Tuple2<String,String>> { + + public static final List<Tuple2<String,String>> values = Collections.synchronizedList(new ArrayList<Tuple2<String,String>>()); + + public void invoke(Tuple2<String,String> value, Context context) throws Exception { + //System.out.println(value.f0 + " | " + value.f1); + values.add(value); + } + } + + //自定义数据source,用于test + public static class MySource implements SourceFunction<String> { + + private boolean isRunning = true; + + @Override + public void run(SourceContext<String> ctx) throws Exception { + System.out.println("MySource begin"); + int count = 0; + while (isRunning) { + Random r = new Random(); + String randomIp = r.nextInt(255) + "." + r.nextInt(255) + "." + r.nextInt(255) + "." + r.nextInt(255); + ctx.collect(randomIp); + Thread.sleep(1000); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + + + + + + + + + +} diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 819af84..7390f33 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -74,7 +74,7 @@ destination.ip.partition.num=10000 data.center.id.num=15 #IP mmdb库路径 -ip.mmdb.path=D:\\data\\dat\\ +ip.mmdb.path=D:\\data\\dat\\bak\\ #ip.mmdb.path=/home/bigdata/topology/dat/ #ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/ @@ -130,9 +130,9 @@ sasl.jaas.config.flag=1 #nacos配置 nacos.server.addr=192.168.44.12:8848 -nacos.namespace=test +nacos.namespace=public nacos.username=nacos nacos.password=nacos -nacos.data.id=dos_detection.properties -nacos.group=Galaxy +nacos.data.id=knowledge_base.json +nacos.group=DEFAULT_GROUP nacos.read.timeout=5000
\ No newline at end of file |
