diff options
| author | wangkuan <[email protected]> | 2023-12-08 19:07:58 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2023-12-08 19:07:58 +0800 |
| commit | 4fc6b233b2ef82f22ce362c56b3656909f579c38 (patch) | |
| tree | c78bb770edd8dcab2624f57834e45a52045817ca | |
| parent | 770273ae65d55dd84e18e546ecd45aa8040464f8 (diff) | |
[improve][core][common]部分命名问题,新增部分函数
27 files changed, 805 insertions, 985 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java index a5ac06e..3591ce6 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java @@ -13,23 +13,15 @@ import static com.google.common.base.Preconditions.checkNotNull; @Data public class EngineConfig implements Serializable { - private int tickTupleFreqSecs = ServerConfigOptions.TICK_TUPLE_FREQ_SECS.defaultValue(); - private int gtpcScanMaxRows = ServerConfigOptions.GTPC_SCAN_MAX_ROWS.defaultValue(); - private int radiusScanMaxRows = ServerConfigOptions.RADIUS_SCAN_MAX_ROWS.defaultValue(); - private String radiusTableName = ServerConfigOptions.RADIUS_TABLE_NAME.defaultValue(); - private String gtpcTableName = ServerConfigOptions.GTPC_TABLE_NAME.defaultValue(); - private int hbaseRpcTimeout = ServerConfigOptions.HBASE_RPC_TIMEOUT.defaultValue(); + private HttpConPoolConfig httpConPoolConfig = ServerConfigOptions.HTTP_CON_POOL.defaultValue(); private List<KnowledgeConfig> knowledgeBaseConfig = ServerConfigOptions.KNOWLEDGE_BASE.defaultValue(); - private SnowflakeConfig snowflakeConfig = ServerConfigOptions.SNOWFLAKE.defaultValue(); private NacosConfig nacosConfig = ServerConfigOptions.NACOS.defaultValue(); private ConsulConfig consulConfig = ServerConfigOptions.CONSUL.defaultValue(); - private HosConfig hosConfig = ServerConfigOptions.HOS.defaultValue(); private ZookeeperConfig zookeeperConfig = ServerConfigOptions.ZOOKEEPER.defaultValue(); private HdfsConfig hdfsConfig = ServerConfigOptions.HDFS.defaultValue(); - public void setKnowledgeBaseConfig(List<KnowledgeConfig> knowledgeBaseConfig) { checkNotNull(knowledgeBaseConfig, ServerConfigOptions.KNOWLEDGE_BASE + "knowledgeConfig should not be null"); this.knowledgeBaseConfig = knowledgeBaseConfig; diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java index 4cce3a0..04bc7ec 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java @@ -9,41 +9,6 @@ import java.util.Map; public class ServerConfigOptions { - public static final Option<Integer> TICK_TUPLE_FREQ_SECS = - Options.key("tick_tuple_freq_secs") - .intType() - .defaultValue(60) - .withDescription("The frequency of tick tuple (in seconds)."); - - public static final Option<Integer> GTPC_SCAN_MAX_ROWS = - Options.key("gtpc_scan_max_rows") - .intType() - .defaultValue(10000) - .withDescription("The max rows of gtpc scan."); - - public static final Option<Integer> RADIUS_SCAN_MAX_ROWS = - Options.key("radius_scan_max_rows") - .intType() - .defaultValue(10000) - .withDescription("The max rows of radius scan."); - - public static final Option<String> RADIUS_TABLE_NAME = - Options.key("radius_table_name") - .stringType() - .defaultValue("RADIUS-TABLE") - .withDescription("The table name of radius."); - - public static final Option<String> GTPC_TABLE_NAME = - Options.key("gtpc_table_name") - .stringType() - .defaultValue("GTPC-TABLE") - .withDescription("The table name of gtpc."); - - public static final Option<Integer> HBASE_RPC_TIMEOUT = - Options.key("hbase_rpc_timeout") - .intType() - .defaultValue(60000) - .withDescription("The rpc timeout of hbase."); public static final Option<Integer> HTTP_CON_POOL_MAX_TOTAL = Options.key("max_total") @@ -101,10 +66,6 @@ public class ServerConfigOptions { .defaultValue(new ArrayList<>()) .withDescription("The files of knowledgebase."); - public static final Option<List<KnowledgeConfig>> KNOWLEDGE_BASE_ASN_LOOKUP = Options.key("asnlookup") - .type(new TypeReference<List<KnowledgeConfig>>() {}) - .noDefaultValue() - .withDescription("The asn lookup configuration."); public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_TYPE = Options.key("fs_type") .stringType() @@ -116,9 +77,6 @@ public class ServerConfigOptions { .defaultValue("") .withDescription("The default path of knowledge base storage."); - public static final Option<StorageConfig> KNOWLEDGE_BASE_STORAGE = Options.key("storage").type(new TypeReference<StorageConfig>() {}) - .defaultValue(new StorageConfig()) - .withDescription("The storage configuration of knowledge base."); public static final Option<List<KnowledgeConfig>> KNOWLEDGE_BASE = diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java index 558bf69..ccdd58a 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java @@ -53,30 +53,14 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso final EngineConfig engineConfig = config.getEngineConfig(); for (Node node : childElements(engineNode)) { String name = cleanNodeName(node); - if (ServerConfigOptions.TICK_TUPLE_FREQ_SECS.key().equals(name)) { - engineConfig.setTickTupleFreqSecs(getIntegerValue(ServerConfigOptions.TICK_TUPLE_FREQ_SECS.key(), getTextContent(node))); - } else if (ServerConfigOptions.GTPC_SCAN_MAX_ROWS.key().equals(name)) { - engineConfig.setGtpcScanMaxRows(getIntegerValue(ServerConfigOptions.GTPC_SCAN_MAX_ROWS.key(), getTextContent(node))); - } else if (ServerConfigOptions.RADIUS_SCAN_MAX_ROWS.key().equals(name)) { - engineConfig.setRadiusScanMaxRows(getIntegerValue(ServerConfigOptions.RADIUS_SCAN_MAX_ROWS.key(), getTextContent(node))); - } else if (ServerConfigOptions.RADIUS_TABLE_NAME.key().equals(name)) { - engineConfig.setRadiusTableName(getTextContent(node)); - } else if (ServerConfigOptions.GTPC_TABLE_NAME.key().equals(name)) { - engineConfig.setGtpcTableName(getTextContent(node)); - } else if (ServerConfigOptions.HBASE_RPC_TIMEOUT.key().equals(name)) { - engineConfig.setHbaseRpcTimeout(getIntegerValue(ServerConfigOptions.HBASE_RPC_TIMEOUT.key(), getTextContent(node))); - } else if (ServerConfigOptions.HTTP_CON_POOL.key().equals(name)) { + if (ServerConfigOptions.HTTP_CON_POOL.key().equals(name)) { engineConfig.setHttpConPoolConfig(parseHttpConPoolConfig(node)); } else if (ServerConfigOptions. KNOWLEDGE_BASE.key().equals(name)) { engineConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node)); - } else if (ServerConfigOptions.SNOWFLAKE.key().equals(name)) { - engineConfig.setSnowflakeConfig(parseSnowflakeConfig(node)); } else if (ServerConfigOptions.NACOS.key().equals(name)) { engineConfig.setNacosConfig(parseNacosConfig(node)); } else if (ServerConfigOptions.CONSUL.key().equals(name)) { engineConfig.setConsulConfig(parseConsulConfig(node)); - } else if (ServerConfigOptions.HOS.key().equals(name)) { - engineConfig.setHosConfig(parseHosConfig(node)); } else if (ServerConfigOptions.ZOOKEEPER.key().equals(name)) { engineConfig.setZookeeperConfig(parseZookeeperConfig(node)); } else if (ServerConfigOptions.HDFS.key().equals(name)) { @@ -89,18 +73,6 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso } } - private HosConfig parseHosConfig(Node hosNode) { - HosConfig hosConfig = new HosConfig(); - for (Node node : childElements(hosNode)) { - String name = cleanNodeName(node); - if (ServerConfigOptions.HOS_TOKEN.key().equals(name)) { - hosConfig.setToken(getTextContent(node)); - } else { - LOGGER.warning("Unrecognized configuration element: " + name); - } - } - return hosConfig; - } private ZookeeperConfig parseZookeeperConfig(Node zookeeperNode) { ZookeeperConfig zookeeperConfig = new ZookeeperConfig(); @@ -170,18 +142,7 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso return nacosConfig; } - private SnowflakeConfig parseSnowflakeConfig(Node snowflakeNode) { - SnowflakeConfig snowflakeConfig = new SnowflakeConfig(); - for (Node node : childElements(snowflakeNode)) { - String name = cleanNodeName(node); - if (ServerConfigOptions.SNOWFLAKE_DATA_CENTER_ID.key().equals(name)) { - snowflakeConfig.setDataCenterId(getTextContent(node)); - } else { - LOGGER.warning("Unrecognized configuration element: " + name); - } - } - return snowflakeConfig; - } + private HttpConPoolConfig parseHttpConPoolConfig(Node httpConPoolNode) { HttpConPoolConfig httpConPoolConfig = new HttpConPoolConfig(); for (Node node : childElements(httpConPoolNode)) { @@ -213,8 +174,6 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso return knowledgeConfigList; } - - private KnowledgeConfig parseKnowledgeConfig(Node asnLookupNode) { KnowledgeConfig knowledgeConfig = new KnowledgeConfig(); for (Node node : childElements(asnLookupNode)) { diff --git a/groot-common/src/main/resources/groot-platform-knowledge-base-plugin b/groot-common/src/main/resources/groot-platform-knowledge-base-plugin index b3f1204..c8a3f7c 100644 --- a/groot-common/src/main/resources/groot-platform-knowledge-base-plugin +++ b/groot-common/src/main/resources/groot-platform-knowledge-base-plugin @@ -1 +1,2 @@ -com.geedgenetworks.core.utils.ASNKnowledgeBase
\ No newline at end of file +com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase +com.geedgenetworks.core.utils.KnowlegdeBase.IpKnowledgeBase
\ No newline at end of file diff --git a/groot-common/src/main/resources/groot-platform-plugin b/groot-common/src/main/resources/groot-platform-plugin index 041a20c..bebf1e9 100644 --- a/groot-common/src/main/resources/groot-platform-plugin +++ b/groot-common/src/main/resources/groot-platform-plugin @@ -6,4 +6,4 @@ com.geedgenetworks.core.udf.JsonExtract com.geedgenetworks.core.udf.UnixTimestamp com.geedgenetworks.core.udf.Domain com.geedgenetworks.core.udf.DecodeBase64 -com.geedgenetworks.core.udf.IpLocation
\ No newline at end of file +com.geedgenetworks.core.udf.GeoIpLookup
\ No newline at end of file diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml index 153529a..d0906bc 100644 --- a/groot-common/src/main/resources/grootstream.yaml +++ b/groot-common/src/main/resources/grootstream.yaml @@ -21,8 +21,8 @@ grootstream: files: default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0 user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb - - name: cn_asnlookup - type: asnlookup + - name: tsg_geoiplookup + type: geoiplookup files: default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0 user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb diff --git a/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java b/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java index c9634c8..f6f27de 100644 --- a/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java +++ b/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java @@ -16,7 +16,6 @@ public class YamlGrootStreamConfigParserTest { throw new RuntimeException("can't find yaml in resources"); } Assertions.assertNotNull(config); - Assertions.assertEquals(90, config.getEngineConfig().getTickTupleFreqSecs()); Assertions.assertNotNull(config.getEngineConfig().getNacosConfig().getServerAddr()); Assertions.assertTrue(config.getEngineConfig().getHttpConPoolConfig().getSocketTimeout() > 0); Assertions.assertNotNull(config.getEngineConfig().getKnowledgeBaseConfig()); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index 22abbfc..a81bbd3 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -10,7 +10,7 @@ import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.ProjectionConfig; import com.geedgenetworks.core.pojo.UDFContext; import com.geedgenetworks.core.udf.UDF; -import com.geedgenetworks.core.utils.KnowledgeBaseSchedule; +import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule; import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.AviatorEvaluatorInstance; import com.googlecode.aviator.Expression; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java index 8c0ce93..14897ff 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java @@ -8,11 +8,11 @@ import com.geedgenetworks.common.exception.CommonErrorCode; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.UDFContext; -import com.geedgenetworks.core.utils.ASNKnowledgeBase; +import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.geedgenetworks.core.utils.KnowledgeBaseSchedule; +import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; @@ -37,6 +37,9 @@ public class AsnLookup implements UDF { if(vender.equals(knowledgeConfig.getName())){ KnowledgeBaseSchedule.initKnowledgeBase(knowledgeConfig); } + else { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param vendor_id value is not correct"); + } } } @@ -51,7 +54,7 @@ public class AsnLookup implements UDF { event.getExtractedFields() .put( udfContext.getOutput_fields().get(0), - ASNKnowledgeBase.getVenderWithAsnLookup() + AsnKnowledgeBase.getVenderWithAsnLookup() .get(vender) .asnLookup( event.getExtractedFields() diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Combine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Combine.java deleted file mode 100644 index 2080768..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Combine.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.geedgenetworks.core.udf; - -import com.geedgenetworks.core.pojo.Event; -import com.geedgenetworks.core.pojo.UDFContext; -import org.apache.flink.api.common.functions.RuntimeContext; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Map; - -public class Combine implements UDF { - - private UDFContext udfContext; - - private StringBuilder stringBuilder; - - - @Override - public void open(RuntimeContext runtimeContext, UDFContext udfContext) { - - this.udfContext = udfContext; - if (udfContext.getParameters() != null - && udfContext.getParameters().size() != 0) { - ArrayList<String> keys = new ArrayList<>(); - for (Map.Entry<String, Object> entry : udfContext.getParameters().entrySet()) { - String key = entry.getKey(); - keys.add(key); - } - keys.sort(new NaturalNumberStringComparator()); - stringBuilder = new StringBuilder(); - for (String key : keys) { - stringBuilder.append(udfContext.getParameters().get(key)); - } - } - } - - @Override - public Event evaluate(Event event) { - - if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { - stringBuilder.append( - event.getExtractedFields().get(udfContext.getLookup_fields().get(0))); - event.getExtractedFields() - .put(udfContext.getOutput_fields().get(0), stringBuilder.toString()); - } - return event; - } - - @Override - public String functionName() { - return "COMBINE"; - } - - @Override - public void close() { - - } - - static class NaturalNumberStringComparator implements Comparator<String> { - @Override - public int compare(String s1, String s2) { - String[] s1Array = s1.split("path"); - String[] s2Array = s2.split("path"); - Integer n1 = Integer.parseInt(s1Array[1]); - Integer n2 = Integer.parseInt(s2Array[1]); - return n1.compareTo(n2); - } - } - -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/IpLocation.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java index 880862a..4f09969 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/IpLocation.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java @@ -1,8 +1,14 @@ package com.geedgenetworks.core.udf; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.EngineConfig; +import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.UDFContext; -import com.geedgenetworks.core.utils.IPKnowledgeBase; +import com.geedgenetworks.core.utils.KnowlegdeBase.IpKnowledgeBase; +import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule; import com.geedgenetworks.utils.StringUtil; import cn.hutool.log.Log; @@ -10,10 +16,11 @@ import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.TypeReference; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; import java.util.HashMap; -public class IpLocation implements UDF { +public class GeoIpLookup implements UDF { private UDFContext udfContext; private static final Log logger = LogFactory.get(); @@ -22,12 +29,25 @@ public class IpLocation implements UDF { @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + checkUdfContext(udfContext); this.udfContext = udfContext; - this.vender = udfContext.getParameters().get("kb_vender").toString(); - // this.option = udfContext.getParameters().get("option").toString(); - this.option ="IP_TO_COUNTRY"; + this.vender = udfContext.getParameters().get("vendor_id").toString(); + this.option = udfContext.getParameters().get("option").toString(); + Configuration configuration = (Configuration) runtimeContext + .getExecutionConfig().getGlobalJobParameters(); + EngineConfig engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class); + for(KnowledgeConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){ + if(vender.equals(knowledgeConfig.getName())){ + KnowledgeBaseSchedule.initKnowledgeBase(knowledgeConfig); + } + else { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param vendor_id value is not correct"); + } + } } + + @Override public Event evaluate(Event event) { if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) { @@ -37,7 +57,7 @@ public class IpLocation implements UDF { event.getExtractedFields() .put( udfContext.getOutput_fields().get(0), - IPKnowledgeBase.venderWithIpLookup + IpKnowledgeBase.getVenderWithIpLookup() .get(vender) .countryLookup( event.getExtractedFields() @@ -51,7 +71,7 @@ public class IpLocation implements UDF { event.getExtractedFields() .put( udfContext.getOutput_fields().get(0), - IPKnowledgeBase.venderWithIpLookup + IpKnowledgeBase.getVenderWithIpLookup() .get(vender) .provinceLookup( event.getExtractedFields() @@ -65,7 +85,7 @@ public class IpLocation implements UDF { event.getExtractedFields() .put( udfContext.getOutput_fields().get(0), - IPKnowledgeBase.venderWithIpLookup + IpKnowledgeBase.getVenderWithIpLookup() .get(vender) .cityLookup( event.getExtractedFields() @@ -79,7 +99,7 @@ public class IpLocation implements UDF { event.getExtractedFields() .put( udfContext.getOutput_fields().get(0), - IPKnowledgeBase.venderWithIpLookup + IpKnowledgeBase.getVenderWithIpLookup() .get(vender) .cityLookupDetail( event.getExtractedFields() @@ -93,7 +113,7 @@ public class IpLocation implements UDF { event.getExtractedFields() .put( udfContext.getOutput_fields().get(0), - IPKnowledgeBase.venderWithIpLookup + IpKnowledgeBase.getVenderWithIpLookup() .get(vender) .locationLookupDetail( event.getExtractedFields() @@ -103,9 +123,9 @@ public class IpLocation implements UDF { .get(0)) .toString())); break; - case "IP_TO_GEO": + case "IP_TO_LATLNG": String geo = - IPKnowledgeBase.venderWithIpLookup + IpKnowledgeBase.getVenderWithIpLookup() .get(vender) .latLngLookup( event.getExtractedFields() @@ -125,9 +145,9 @@ public class IpLocation implements UDF { case "IP_TO_PROVIDER": HashMap<String, Object> serverIpMap = JSON.parseObject( - IPKnowledgeBase.venderWithIpLookup + IpKnowledgeBase.getVenderWithIpLookup() .get(vender) - .asnLookupDetail( + .ispLookup( event.getExtractedFields() .get( udfContext @@ -141,6 +161,34 @@ public class IpLocation implements UDF { udfContext.getOutput_fields().get(0), serverIpMap.getOrDefault("isp", StringUtil.EMPTY)); break; + case "IP_TO_JSON ": + event.getExtractedFields() + .put( + udfContext.getOutput_fields().get(0), + IpKnowledgeBase.getVenderWithIpLookup() + .get(vender) + .infoLookupToJSONString( + event.getExtractedFields() + .get( + udfContext + .getLookup_fields() + .get(0)) + .toString())); + break; + case "IP_TO_OBJECT": + event.getExtractedFields() + .put( + udfContext.getOutput_fields().get(0), + IpKnowledgeBase.getVenderWithIpLookup() + .get(vender) + .infoLookup( + event.getExtractedFields() + .get( + udfContext + .getLookup_fields() + .get(0)) + .toString())); + break; default: break; } @@ -148,6 +196,48 @@ public class IpLocation implements UDF { return event; } + private void checkUdfContext(UDFContext udfContext) { + if(udfContext.getLookup_fields().size() != 1){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value"); + } + if(udfContext.getOutput_fields().size() != 1){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); + } + if(udfContext.getParameters() == null){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function needs parameters"); + } + if(!udfContext.getParameters().containsKey("vendor_id")){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey vendor_id"); + } + if(!udfContext.getParameters().containsKey("option")){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option"); + } + else{ + if(!udfContext.getParameters().get("option").toString().equals("IP_TO_COUNTRY") && //IP_TO_COUNTRY + !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVINCE") && //IP_TO_PROVINCE + !udfContext.getParameters().get("option").toString().equals("IP_TO_CITY") && //IP_TO_CITY + !udfContext.getParameters().get("option").toString().equals("IP_TO_SUBDIVISION_ADDR") && //IP_TO_SUBDIVISION_ADDR + !udfContext.getParameters().get("option").toString().equals("IP_TO_DETAIL") && //IP_TO_DETAIL + !udfContext.getParameters().get("option").toString().equals("IP_TO_LATLNG") && //IP_TO_LATLNG + !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVIDER") && //IP_TO_PROVIDER + !udfContext.getParameters().get("option").toString().equals("IP_TO_JSON") && //IP_TO_JSON + !udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT") ){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct"); + } + if(udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT")){ + + if(!udfContext.getParameters().containsKey("geolocation_field_mapping")){ + + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey geolocation_field_mapping"); + + } + + } + } + + } + + @Override public String functionName() { return "IP_LOCATION"; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java new file mode 100644 index 0000000..6af2741 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -0,0 +1,86 @@ +package com.geedgenetworks.core.udf; + +import com.alibaba.fastjson.JSON; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.EngineConfig; +import com.geedgenetworks.core.pojo.Event; +import com.geedgenetworks.core.pojo.UDFContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; + +import java.util.*; + +public class PathCombine implements UDF { + + private UDFContext udfContext; + + private StringBuilder stringBuilder; + + private Map<String, String> pathParameters = new LinkedHashMap<>(); + + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + + this.udfContext = udfContext; +/* + Configuration configuration = (Configuration) runtimeContext + .getExecutionConfig().getGlobalJobParameters(); + EngineConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class); +*/ + + if (udfContext.getParameters() != null + && !udfContext.getParameters().isEmpty()) { + + Object object = udfContext.getParameters().get("path"); + ArrayList<Object> arrayList = new ArrayList<>(Arrays.asList(object)); + for (Object key : arrayList) { + String column =key.toString(); + if(column.contains("global")){ + pathParameters.put(column,"");//待定义全局变量 + } + else { + pathParameters.put(column,"dynamic"); + } + } + + ArrayList<String> keys = new ArrayList<>(); + for (Map.Entry<String, Object> entry : udfContext.getParameters().entrySet()) { + String key = entry.getKey(); + keys.add(key); + } + stringBuilder = new StringBuilder(); + for (String key : keys) { + stringBuilder.append(udfContext.getParameters().get(key)); + } + } + } + + + + @Override + public Event evaluate(Event event) { + StringBuilder stringBuilder = new StringBuilder(); + for (Map.Entry<String, String> entry : pathParameters.entrySet()) { + if (entry.getValue().equals("dynamic")) { + stringBuilder.append(event.getExtractedFields().getOrDefault(entry.getKey(),"").toString()); + } + else { + stringBuilder.append(entry.getValue()); + } + } + event.getExtractedFields().put(udfContext.getOutput_fields().get(0), stringBuilder.toString()); + return event; + } + + @Override + public String functionName() { + return "PATH_COMBINE"; + } + + @Override + public void close() { + + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java new file mode 100644 index 0000000..c0ad7f1 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java @@ -0,0 +1,84 @@ +package com.geedgenetworks.core.udf; + +import com.geedgenetworks.common.exception.CommonErrorCode; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.core.pojo.Event; +import com.geedgenetworks.core.pojo.UDFContext; +import com.geedgenetworks.utils.FormatUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.time.Instant; + +@Slf4j +public class UnixTimestampConverter implements UDF { + + private UDFContext udfContext; + + private String precision; + @Override + public void open(RuntimeContext runtimeContext, UDFContext udfContext) { + + this.udfContext = udfContext; + + if(udfContext.getOutput_fields().size() != 1){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value"); + } + if(udfContext.getParameters() == null){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function must contain parameters"); + } + if(!udfContext.getParameters().containsKey("precision")){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey precision"); + } + else{ + if(!udfContext.getParameters().get("precision").toString().equals("seconds") && + !udfContext.getParameters().get("precision").toString().equals("milliseconds")){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct"); + }else { + + this.precision =udfContext.getParameters().get("precision").toString(); + } + } + + + } + + + @Override + public Event evaluate(Event event) { + + Long timestamp = Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString()); + Instant instant = null; + if (String.valueOf(timestamp).length() ==13 ) { + // 时间戳长度大于10,表示为毫秒级时间戳 + instant = Instant.ofEpochMilli(timestamp); + } else if(String.valueOf(timestamp).length() ==10){ + // 时间戳长度小于等于10,表示为秒级时间戳 + instant = Instant.ofEpochSecond(timestamp); + }else { + return event; + } + switch (precision) { + case "seconds": + timestamp = instant.getEpochSecond(); + break; + case "milliseconds": + timestamp = instant.toEpochMilli(); + break; + } + event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp); + return event; + + } + + + @Override + public String functionName() { + return "UNIX_TIMESTAMP_FUNCTION"; + } + + @Override + public void close() { + + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/ASNKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/ASNKnowledgeBase.java deleted file mode 100644 index d906949..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/ASNKnowledgeBase.java +++ /dev/null @@ -1,156 +0,0 @@ -package com.geedgenetworks.core.utils; - - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.TypeReference; -import com.geedgenetworks.common.config.KnowledgeConfig; - -import com.geedgenetworks.common.utils.HttpClientUtils; -import com.geedgenetworks.core.pojo.KnowLedgeEntity; -import com.geedgenetworks.utils.IpLookupV2; -import lombok.Getter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.net.URI; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.*; - -import static scala.tools.fusesource_embedded.jansi.AnsiRenderer.test; - - -public class ASNKnowledgeBase extends knowledgeUtil { - - - private static final Logger LOG = LoggerFactory.getLogger(ASNKnowledgeBase.class); - @Getter - private static Map<String, IpLookupV2> venderWithAsnLookup = new HashMap<>(); - private static final Logger logger = LoggerFactory.getLogger(ASNKnowledgeBase.class); - private static ASNKnowledgeBase instance; - // 私有构造函数,防止外部实例化 - - public static synchronized ASNKnowledgeBase getInstance() throws Exception { - if (instance == null) { - instance = new ASNKnowledgeBase(); - } - return instance; - } - - - public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) throws Exception { - - IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false); - - for (int i = 0; i < knowledgeConfig.getFiles().size(); i++) { - - KnowLedgeEntity knowLedgeEntity = getMetadata(knowledgeConfig.getFiles().get(i)); - - byte[] fileByte = downloadFile(knowLedgeEntity.getPath(), knowLedgeEntity.getIsValid()); - - if (calculateSHA256(fileByte).equals(knowLedgeEntity.getSha256())) { - switch (i) { - case 0: - asnLookupBuilder.loadAsnDataFile(new ByteArrayInputStream(fileByte)); - - break; - case 1: - asnLookupBuilder.loadDataFilePrivate(new ByteArrayInputStream(fileByte)); - - break; - } - } else { - logger.error("check file sha256 error "); - return false; - } - } - IpLookupV2 ipLookup = asnLookupBuilder.build(); - venderWithAsnLookup.put(knowledgeConfig.getName(), ipLookup); - return true; - } - - @Override - public String functionName() { - return "asnlookup"; - } - - - public static String calculateSHA256(byte[] data) throws NoSuchAlgorithmException { - MessageDigest digest = MessageDigest.getInstance("SHA-256"); - - StringBuilder result = new StringBuilder(); - for (byte b : digest.digest(data)) { - result.append(String.format("%02x", b)); - } - return result.toString(); - } - - - private static KnowLedgeEntity getMetadata(String id) { - - try { -// HttpClientUtils httpClientUtils = new HttpClientUtils(); - // String metadate = httpClientUtils.httpGet(URI.create("/v1/knowledge_base/"+ id +"/meta")) ; -//临时写死 - KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity(); - knowLedgeEntity.setIsValid(1); - knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c"); - knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb"); - // List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {}); - // return knowLedgeEntityList.get(0); - return knowLedgeEntity; - } catch (Exception e) { - logger.error("get file Metadata " + id + " error: " + e.getMessage()); - } - - return null; - } - - private static byte[] downloadFile(String fileUrl, int isValid) { - byte[] inputStream = null; - if (isValid == 1) { - try { - long start = System.currentTimeMillis(); - HttpClientUtils httpClientUtils = new HttpClientUtils(); - inputStream = httpClientUtils.httpGetByte(fileUrl, 3000); - long end = System.currentTimeMillis(); - logger.info( - "download file " + fileUrl + " finished, speed " + (end - start) + " ms"); - } catch (Exception e) { - logger.error("download file " + fileUrl + " error: " + e.getMessage()); - } - } - return inputStream; - } - - private static void test(String ip) { - - IpLookupV2 ipLookup = venderWithAsnLookup.get("TSG"); - System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip)); - System.out.println("cityLookup:" + ipLookup.cityLookup(ip)); - System.out.println("countryLookup:" + ipLookup.countryLookup(ip)); - System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip)); - System.out.println("provinceLookup:" + ipLookup.provinceLookup(ip)); - System.out.println("----------------------------"); - - long start1 = System.currentTimeMillis(); - System.out.println( - "administrativeAreaLookupDetail:" + ipLookup.administrativeAreaLookupDetail(ip)); - System.out.println("asnLookupDetail:" + ipLookup.asnLookupDetail(ip)); - System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip)); - System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip)); - System.out.println("ispLookup:" + ipLookup.ispLookup(ip)); - System.out.println("cityLatLngLookup:" + ipLookup.cityLatLngLookup(ip)); - long end1 = System.currentTimeMillis(); - System.out.println(end1 - start1); - - System.out.println("--------------------------------------"); - System.out.println("infoLookupToCSV:" + ipLookup.infoLookupToCSV(ip)); - System.out.println("infoLookupToJson:" + ipLookup.ispLookup(ip)); - System.out.println("----------------------------"); - } - - -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/IPKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/IPKnowledgeBase.java deleted file mode 100644 index 4f7aabf..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/IPKnowledgeBase.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.geedgenetworks.core.utils; - - -import com.geedgenetworks.core.pojo.KnowLedgeEntity; -import com.geedgenetworks.utils.IpLookupV2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.*; - - -public class IPKnowledgeBase { - - public static Map<String, IpLookupV2> venderWithIpLookup = new HashMap<>(); - private static final Logger logger = LoggerFactory.getLogger(IPKnowledgeBase.class); - - - - private static IPKnowledgeBase instance; - - // 私有构造函数,防止外部实例化 - public IPKnowledgeBase() { - - } - - public static synchronized IPKnowledgeBase getInstance(Map<String, KnowLedgeEntity> map) throws Exception { - if (instance == null) { - updateKnowledgeBase(map); - instance = new IPKnowledgeBase( ); - } - return instance; - } - - public static void updateKnowledgeBase(Map<String, KnowLedgeEntity> map) throws Exception { - - /* for (Map.Entry<String, KnowLedgeEntity> entry : map.entrySet()) { - IpLookupV2.Builder ipLookupBuilder = new IpLookupV2.Builder(false); - - try { - if ("ip_v4_built_in_id".equals(entry.getKey())) { - ipLookupBuilder.loadDataFileV4( - entry.getValue().getInputStream()); - } - if ("ip_v6_built_in_id".equals(entry.getKey())) { - ipLookupBuilder.loadDataFileV6( - entry.getValue().getInputStream()); - } - if ("ip_v4_user_defined_id".equals(entry.getKey())) { - ipLookupBuilder.loadDataFilePrivateV4( - entry.getValue().getInputStream()); - } - if ("ip_v6_user_defined_id".equals(entry.getKey())) { - ipLookupBuilder.loadDataFilePrivateV6( - entry.getValue().getInputStream()); - } - } catch (Exception e) { - logger.error( - "加载ip知识库" - + entry.getKey() - + ":" - + entry.getValue() - + "失败 " - + e.toString()); - - - } - IpLookupV2 ipLookup = ipLookupBuilder.build(); - venderWithIpLookup.put(entry.getKey(), ipLookup); - }*/ - - } - - public static void main(String[] args) throws Exception { - String ip = "114.64.231.114"; - - test(ip); - } - - private static void test(String ip) { - - IpLookupV2 ipLookup = venderWithIpLookup.get("TSG"); - System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip)); - System.out.println("cityLookup:" + ipLookup.cityLookup(ip)); - System.out.println("countryLookup:" + ipLookup.countryLookup(ip)); - System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip)); - System.out.println("provinceLookup:" + ipLookup.provinceLookup(ip)); - System.out.println("----------------------------"); - - long start1 = System.currentTimeMillis(); - System.out.println( - "administrativeAreaLookupDetail:" + ipLookup.administrativeAreaLookupDetail(ip)); - System.out.println("asnLookupDetail:" + ipLookup.asnLookupDetail(ip)); - System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip)); - System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip)); - System.out.println("ispLookup:" + ipLookup.ispLookup(ip)); - System.out.println("cityLatLngLookup:" + ipLookup.cityLatLngLookup(ip)); - long end1 = System.currentTimeMillis(); - System.out.println(end1 - start1); - - System.out.println("--------------------------------------"); - System.out.println("infoLookupToCSV:" + ipLookup.infoLookupToCSV(ip)); - System.out.println("infoLookupToJson:" + ipLookup.ispLookup(ip)); - System.out.println("----------------------------"); - } -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/IpLookupUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/IpLookupUtils.java deleted file mode 100644 index 06a6e48..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/IpLookupUtils.java +++ /dev/null @@ -1,246 +0,0 @@ -package com.geedgenetworks.core.utils; - -import com.geedgenetworks.common.config.CommonConfig; - -import com.geedgenetworks.utils.IpLookupV2; -import com.geedgenetworks.utils.StringUtil; -import org.apache.commons.io.IOUtils; - -import cn.hutool.core.io.file.FileReader; -import cn.hutool.crypto.digest.DigestUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.util.Map; - -/** - * @author qidaijie - * @version 2022/11/16 15:23 - */ -public class IpLookupUtils { - private static final Log logger = LogFactory.get(); - private static final String ipv4BuiltInName = "ip_v4_built_in.mmdb"; - private static final String ipv6BuiltInName = "ip_v6_built_in.mmdb"; - private static final String ipv4UserDefinedName = "ip_v4_user_defined.mmdb"; - private static final String ipv6UserDefinedName = "ip_v6_user_defined.mmdb"; - private static final String asnV4Name = "asn_v4.mmdb"; - private static final String asnV6Name = "asn_v6.mmdb"; - private static IpLookupV2 ipLookup; - - public IpLookupUtils() { - ipLookup = initIpLookup(); - } - - /** - * 从HDFS下载文件更新IpLookup - * - * @param knowledgeFileCache 广播流下发的元数据信息,包含更新内容 - * @return 更新后的IpLookup - */ - public static void updateIpLookup(Map<String, String> knowledgeFileCache) { - IpLookupV2.Builder builder = new IpLookupV2.Builder(false); - try { - InputStream ipv4BuiltInFile = - getUpdateFile(ipv4BuiltInName, knowledgeFileCache.get(ipv4BuiltInName)); - if (ipv4BuiltInFile != null) { - builder.loadDataFileV4(ipv4BuiltInFile); - } - - InputStream ipv6BuiltInFileFile = - getUpdateFile(ipv6BuiltInName, knowledgeFileCache.get(ipv6BuiltInName)); - if (ipv6BuiltInFileFile != null) { - builder.loadDataFileV6(ipv6BuiltInFileFile); - } - - InputStream ipv4UserDefinedFile = - getUpdateFile(ipv4UserDefinedName, knowledgeFileCache.get(ipv4UserDefinedName)); - if (ipv4UserDefinedFile != null) { - builder.loadDataFilePrivateV4(ipv4UserDefinedFile); - } - - InputStream ipv6UserDefinedFile = - getUpdateFile(ipv6UserDefinedName, knowledgeFileCache.get(ipv6UserDefinedName)); - if (ipv6UserDefinedFile != null) { - builder.loadDataFilePrivateV6(ipv6UserDefinedFile); - } - - InputStream asnV4File = getUpdateFile(asnV4Name, knowledgeFileCache.get(asnV4Name)); - if (asnV4File != null) { - builder.loadAsnDataFileV4(asnV4File); - } - - InputStream asnV6File = getUpdateFile(asnV6Name, knowledgeFileCache.get(asnV6Name)); - if (asnV6File != null) { - builder.loadAsnDataFileV6(asnV6File); - } - ipLookup = builder.build(); - } catch (Exception e) { - - e.printStackTrace(); - } - } - - /** - * 从HDFS下载文件初始化IpLookup - * - * @return 初始化的IpLookup - */ - public static IpLookupV2 initIpLookup() { - IpLookupV2.Builder builder = new IpLookupV2.Builder(false); - InputStream ipv4BuiltInFile = getInitFile(ipv4BuiltInName); - if (ipv4BuiltInFile != null) { - builder.loadDataFileV4(ipv4BuiltInFile); - } - - InputStream ipv6BuiltInFileFile = getInitFile(ipv6BuiltInName); - if (ipv6BuiltInFileFile != null) { - builder.loadDataFileV6(ipv6BuiltInFileFile); - } - - InputStream ipv4UserDefinedFile = getInitFile(ipv4UserDefinedName); - if (ipv4UserDefinedFile != null) { - builder.loadDataFilePrivateV4(ipv4UserDefinedFile); - } - - InputStream ipv6UserDefinedFile = getInitFile(ipv6UserDefinedName); - if (ipv6UserDefinedFile != null) { - builder.loadDataFilePrivateV6(ipv6UserDefinedFile); - } - - InputStream asnV4File = getInitFile(asnV4Name); - if (asnV4File != null) { - builder.loadAsnDataFileV4(asnV4File); - } - - InputStream asnV6File = getInitFile(asnV6Name); - if (asnV6File != null) { - builder.loadAsnDataFileV6(asnV6File); - } - return builder.build(); - } - - /** - * 下载更新文件并校验完整性 - * - * @param fileName 文件名 - * @param sha256 文件校验码 - * @return true or false - */ - private static InputStream getUpdateFile(String fileName, String sha256) { - InputStream inputStream = null; - try { - if (StringUtil.isNotBlank(sha256)) { - byte[] downloadFileBytes = downloadFile(fileName); - if (downloadFileBytes != null) { - String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadFileBytes); - if (sha256.equals(downloadFileSha256Hex)) { - inputStream = new ByteArrayInputStream(downloadFileBytes); - } else { - logger.error("The downloaded file {} sha256 is inconsistent!", fileName); - } - } - } else { - byte[] downloadFileBytes = downloadFile(fileName); - if (downloadFileBytes != null) { - inputStream = new ByteArrayInputStream(downloadFileBytes); - } - } - } catch (RuntimeException e) { - logger.error( - "Failed to download the {} file for update cache, Exception message:{}", - fileName, - e.getMessage()); - } finally { - IOUtils.closeQuietly(inputStream); - } - return inputStream; - } - - /** - * 下载文件(初始化) - * - * @param fileName 文件名 - * @return 文件io流 - */ - private static InputStream getInitFile(String fileName) { - InputStream inputStream = null; - try { - byte[] fileBytes = downloadFile(fileName); - if (fileBytes != null) { - inputStream = new ByteArrayInputStream(fileBytes); - } - } catch (RuntimeException e) { - logger.error( - "Failed to download the {} file for initialization cache, The exception message is:{}", - fileName, - e.getMessage()); - e.printStackTrace(); - } finally { - IOUtils.closeQuietly(inputStream); - } - return inputStream; - } - - /** - * 根据不同文件系统获取定位库io流 - * - * @param fileName 文件名 - * @return 文件io流 - */ - private static byte[] downloadFile(String fileName) { - byte[] fileBytes = null; - try { - String filePath = CommonConfig.KNOWLEDGEBASE_FILE_STORAGE_PATH + fileName; - fileBytes = - "hdfs".equals(CommonConfig.KNOWLEDGEBASE_FILE_STORAGE_TYPE) - ? HadoopUtils.downloadFileByBytes(filePath) - : new FileReader(filePath).readBytes(); - } catch (RuntimeException e) { - logger.error( - "Failed to download file {} from {}, The exception message is:{}", - CommonConfig.KNOWLEDGEBASE_FILE_STORAGE_TYPE, - fileName, - e.getMessage()); - e.printStackTrace(); - } - return fileBytes; - } - - public static String getGeoIpDetail(String ip) { - String detail = ""; - try { - detail = ipLookup.cityLookupDetail(ip); - } catch (NullPointerException npe) { - logger.error("The IP Location MMDB file is not loaded or IP is null! " + npe); - } catch (RuntimeException e) { - logger.error("Get clientIP location error! " + e.getMessage()); - } - return detail; - } - - public static String getGeoIpCountry(String ip) { - String country = ""; - try { - country = ipLookup.countryLookup(ip); - } catch (NullPointerException npe) { - logger.error("The IP Location MMDB file is not loaded or IP is null! " + npe); - } catch (RuntimeException e) { - logger.error("Get ServerIP location error! " + e.getMessage()); - } - return country; - } - - public static String getGeoAsn(String ip) { - String asn = ""; - try { - asn = ipLookup.asnLookup(ip); - } catch (NullPointerException npe) { - logger.error("The ASN MMDB file is not loaded or IP is null! " + npe); - } catch (RuntimeException e) { - logger.error("Get IP ASN error! " + e.getMessage()); - } - return asn; - } -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java new file mode 100644 index 0000000..00b2b53 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java @@ -0,0 +1,78 @@ +package com.geedgenetworks.core.utils.KnowlegdeBase; + +import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.common.utils.HttpClientUtils; +import com.geedgenetworks.core.pojo.KnowLedgeEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +public abstract class AbstractKnowledgeBase { + + private static final Logger logger = LoggerFactory.getLogger(AbstractKnowledgeBase.class); + + protected AbstractKnowledgeBase() { + // 抽象类的构造函数 + } + + abstract Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) ; + + abstract String functionName(); + + + static String calculateSHA256(byte[] data) { + MessageDigest digest = null; + StringBuilder result = new StringBuilder(); + try { + digest = MessageDigest.getInstance("SHA-256"); + for (byte b : digest.digest(data)) { + result.append(String.format("%02x", b)); + } + } catch (NoSuchAlgorithmException e) { + logger.error("calculateSHA256 error: " + e.getMessage()); + } + return result.toString(); + } + + + static byte[] downloadFile(String fileUrl, int isValid) { + byte[] inputStream = null; + if (isValid == 1) { + try { + long start = System.currentTimeMillis(); + HttpClientUtils httpClientUtils = new HttpClientUtils(); + inputStream = httpClientUtils.httpGetByte(fileUrl, 3000); + long end = System.currentTimeMillis(); + logger.info( + "download file " + fileUrl + " finished, speed " + (end - start) + " ms"); + } catch (Exception e) { + logger.error("download file " + fileUrl + " error: " + e.getMessage()); + } + } + return inputStream; + } + + static KnowLedgeEntity getMetadata(String id) { + + try { +// HttpClientUtils httpClientUtils = new HttpClientUtils(); + // String metadate = httpClientUtils.httpGet(URI.create("/v1/knowledge_base/"+ id +"/meta")) ; +//临时写死 + KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity(); + knowLedgeEntity.setIsValid(1); + knowLedgeEntity.setSha256("c06db87e9a914a8296ca892880c353884ac15b831c485c29fad9889ffb4e0fa8"); + knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/4bd16ddb-d1cb-4310-b620-02164335c50d-aXBfYnVpbHRpbg==.mmdb"); + /* knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c"); + knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb");*/ + // List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {}); + // return knowLedgeEntityList.get(0); + return knowLedgeEntity; + } catch (Exception e) { + logger.error("get file Metadata " + id + " error: " + e.getMessage()); + } + + return null; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java new file mode 100644 index 0000000..a62acf3 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java @@ -0,0 +1,106 @@ +package com.geedgenetworks.core.utils.KnowlegdeBase; + + +import com.geedgenetworks.common.config.KnowledgeConfig; + +import com.geedgenetworks.core.pojo.KnowLedgeEntity; +import com.geedgenetworks.utils.IpLookupV2; +import lombok.Getter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + + +public class AsnKnowledgeBase extends AbstractKnowledgeBase { + + private static final Logger LOG = LoggerFactory.getLogger(AsnKnowledgeBase.class); + @Getter + private static Map<String, IpLookupV2> venderWithAsnLookup = new ConcurrentHashMap<>(); + private static final Logger logger = LoggerFactory.getLogger(AsnKnowledgeBase.class); + private static AsnKnowledgeBase instance; + // 私有构造函数,防止外部实例化 + + public static synchronized AsnKnowledgeBase getInstance() { + if (instance == null) { + instance = new AsnKnowledgeBase(); + } + return instance; + } + + + + public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) { + + IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false); + + for (int i = 0; i < knowledgeConfig.getFiles().size(); i++) { + try { + KnowLedgeEntity knowLedgeEntity = getMetadata(knowledgeConfig.getFiles().get(i)); + byte[] fileByte = downloadFile(knowLedgeEntity.getPath(), knowLedgeEntity.getIsValid()); + if (calculateSHA256(fileByte).equals(knowLedgeEntity.getSha256())) { + switch (i) { + case 0: + asnLookupBuilder.loadAsnDataFile(new ByteArrayInputStream(fileByte)); + break; + case 1: + asnLookupBuilder.loadDataFilePrivate(new ByteArrayInputStream(fileByte)); + break; + } + } else { + logger.error("check file sha256 error "); + return false; + } + } catch (Exception e) { + logger.error("updateKnowledgeBase error " + e.getMessage()); + return false; + } + } + IpLookupV2 ipLookup = asnLookupBuilder.build(); + venderWithAsnLookup.put(knowledgeConfig.getName(), ipLookup); + return true; + } + @Override + public String functionName() { + return "asnlookup"; + } + + + + + + + + + + private static void test(String ip) { + + IpLookupV2 ipLookup = venderWithAsnLookup.get("TSG"); + System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip)); + System.out.println("cityLookup:" + ipLookup.cityLookup(ip)); + System.out.println("countryLookup:" + ipLookup.countryLookup(ip)); + System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip)); + System.out.println("provinceLookup:" + ipLookup.provinceLookup(ip)); + System.out.println("----------------------------"); + + long start1 = System.currentTimeMillis(); + System.out.println( + "administrativeAreaLookupDetail:" + ipLookup.administrativeAreaLookupDetail(ip)); + System.out.println("asnLookupDetail:" + ipLookup.asnLookupDetail(ip)); + System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip)); + System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip)); + System.out.println("ispLookup:" + ipLookup.ispLookup(ip)); + System.out.println("cityLatLngLookup:" + ipLookup.cityLatLngLookup(ip)); + long end1 = System.currentTimeMillis(); + System.out.println(end1 - start1); + + System.out.println("--------------------------------------"); + System.out.println("infoLookupToCSV:" + ipLookup.infoLookupToCSV(ip)); + System.out.println("infoLookupToJson:" + ipLookup.ispLookup(ip)); + System.out.println("----------------------------"); + } + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/IpKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/IpKnowledgeBase.java new file mode 100644 index 0000000..2dbba13 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/IpKnowledgeBase.java @@ -0,0 +1,64 @@ +package com.geedgenetworks.core.utils.KnowlegdeBase; + + +import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.core.pojo.KnowLedgeEntity; +import com.geedgenetworks.utils.IpLookupV2; +import lombok.Getter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.util.*; + + +public class IpKnowledgeBase extends AbstractKnowledgeBase { + + @Getter + private static Map<String, IpLookupV2> venderWithIpLookup = new HashMap<>(); + private static final Logger logger = LoggerFactory.getLogger(IpKnowledgeBase.class); + private static IpKnowledgeBase instance; + + public static synchronized IpKnowledgeBase getInstance() { + if (instance == null) { + instance = new IpKnowledgeBase(); + } + return instance; + } + @Override + public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) { + IpLookupV2.Builder ipLookupBuilder = new IpLookupV2.Builder(false); + + for (int i = 0; i < knowledgeConfig.getFiles().size(); i++) { + try { + KnowLedgeEntity knowLedgeEntity = getMetadata(knowledgeConfig.getFiles().get(i)); + + + byte[] fileByte = downloadFile(knowLedgeEntity.getPath(), knowLedgeEntity.getIsValid()); + if (calculateSHA256(fileByte).equals(knowLedgeEntity.getSha256())) { + switch (i) { + case 0: + ipLookupBuilder.loadDataFile(new ByteArrayInputStream(fileByte)); + break; + case 1: + ipLookupBuilder.loadDataFilePrivate(new ByteArrayInputStream(fileByte)); + break; + } + } else { + logger.error("check file sha256 error "); + return false; + } + } catch (Exception e) { + logger.error("updateKnowledgeBase error " + e.getMessage()); + return false; + } + } + IpLookupV2 ipLookup = ipLookupBuilder.build(); + venderWithIpLookup.put(knowledgeConfig.getName(), ipLookup); + return true; } + + @Override + public String functionName() { + return "geoiplookup"; + } +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowledgeBaseSchedule.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java index e7c3963..ccbf1a5 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowledgeBaseSchedule.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java @@ -1,27 +1,24 @@ -package com.geedgenetworks.core.utils; +package com.geedgenetworks.core.utils.KnowlegdeBase; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.TypeReference; import com.geedgenetworks.common.config.CommonConfig; import com.geedgenetworks.common.config.KnowledgeConfig; -import com.geedgenetworks.common.utils.HttpClientUtils; import com.geedgenetworks.core.pojo.KnowLedgeEntity; import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.URI; +import java.lang.reflect.Method; import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import static com.geedgenetworks.core.utils.ScheduTask.startTask; public class KnowledgeBaseSchedule { + private static Timer timer; + public static Map<String, KnowledgeConfig> knowledgeConfigMap = new ConcurrentHashMap<>(); public static Map<String, List<KnowLedgeEntity>> KnowLedgeEntityMap = new ConcurrentHashMap<>(); @Getter @@ -30,51 +27,57 @@ public class KnowledgeBaseSchedule { private static KnowledgeBaseSchedule instance; - public static synchronized KnowledgeBaseSchedule getInstance() throws Exception { + public static synchronized KnowledgeBaseSchedule getInstance() { if (instance == null) { instance = new KnowledgeBaseSchedule(); knowledgeBaseClassReflect =CommonConfig.KNOWLEDGE_BASE_CLASS_REFLECT; - startTask(); + ScheduTask.startTask(); } return instance; } - public static void updateKnowledgeBase(String name) throws Exception { + public static void updateKnowledgeBase(String name) { try { // 获取私有构造方法 Class<?> cls = Class.forName(knowledgeBaseClassReflect.get(knowledgeConfigMap.get(name).getType())); Constructor<?> constructor = cls.getDeclaredConstructor(); constructor.setAccessible(true); - knowledgeUtil instance = (knowledgeUtil) constructor.newInstance(); + // knowledgeUtil instance = (knowledgeUtil) constructor.newInstance(); + Method createInstanceMethod = cls.getMethod("getInstance"); + AbstractKnowledgeBase instance = (AbstractKnowledgeBase) createInstanceMethod.invoke(null); // 使用实例调用方法 instance.updateKnowledgeBase(knowledgeConfigMap.get(name)); - } catch (NoSuchMethodException | InstantiationException | - IllegalAccessException | InvocationTargetException e) { + } catch (Exception e) { logger.error(e.getMessage()); } } - public static synchronized void initKnowledgeBase(KnowledgeConfig KnowledgeConfig) { + public static synchronized void initKnowledgeBase(KnowledgeConfig KnowledgeConfig) { - if (!knowledgeConfigMap.containsKey(KnowledgeConfig.getName())) { - List<KnowLedgeEntity> knowLedgeEntityList = new ArrayList<>(); - try { - knowledgeConfigMap.put(KnowledgeConfig.getName(), KnowledgeConfig); - updateKnowledgeBase(KnowledgeConfig.getName()); - for (String id : KnowledgeConfig.getFiles()) { - KnowLedgeEntity knowLedgeEntity = getMetadata(id); - knowLedgeEntityList.add(knowLedgeEntity); + if (instance == null) { + getInstance(); + } + if (!knowledgeConfigMap.containsKey(KnowledgeConfig.getName())) { + List<KnowLedgeEntity> knowLedgeEntityList = new ArrayList<>(); + try { + knowledgeConfigMap.put(KnowledgeConfig.getName(), KnowledgeConfig); + updateKnowledgeBase(KnowledgeConfig.getName()); + for (String id : KnowledgeConfig.getFiles()) { + KnowLedgeEntity knowLedgeEntity = getMetadata(id); + knowLedgeEntityList.add(knowLedgeEntity); + } + KnowLedgeEntityMap.put(KnowledgeConfig.getName(), knowLedgeEntityList); + } catch (Exception e) { + logger.error("initKnowledgeBase " + KnowledgeConfig.getName() + " error: " + e.getMessage()); } KnowLedgeEntityMap.put(KnowledgeConfig.getName(), knowLedgeEntityList); - } catch (Exception e) { - logger.error("initKnowledgeBase " + KnowledgeConfig.getName() + " error: " + e.getMessage()); } - KnowLedgeEntityMap.put(KnowledgeConfig.getName(), knowLedgeEntityList); - } + + } private static KnowLedgeEntity getMetadata(String id) { @@ -124,8 +127,37 @@ public class KnowledgeBaseSchedule { + static class ScheduTask extends TimerTask { + @Override + public void run() { + for (Map.Entry<String, KnowledgeConfig> entry : knowledgeConfigMap.entrySet()) { + Boolean flag = null; + try { + flag = checkUpdateMesssage(entry.getValue().getName()); + if(flag){ + updateKnowledgeBase(entry.getKey()); + } + } catch (NoSuchAlgorithmException e) { + logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + } + + public static void startTask() { + timer = new Timer(); + timer.schedule(new ScheduTask(), 0, 300000); // 每隔300秒执行一次任务 + } + public static void stopTask() { + if (timer != null) { + timer.cancel(); + timer = null; + } + } + } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/ScheduTask.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/ScheduTask.java deleted file mode 100644 index 78e43d7..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/ScheduTask.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.geedgenetworks.core.utils; - -import com.geedgenetworks.common.config.KnowledgeConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.security.NoSuchAlgorithmException; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; - -import static com.geedgenetworks.core.utils.KnowledgeBaseSchedule.*; - -public class ScheduTask extends TimerTask { - - private static Timer timer; - private static final Logger logger = LoggerFactory.getLogger(ScheduTask.class); - - @Override - public void run() { - for (Map.Entry<String, KnowledgeConfig> entry : knowledgeConfigMap.entrySet()) { - Boolean flag = null; - try { - flag = checkUpdateMesssage(entry.getValue().getName()); - if(flag){ - updateKnowledgeBase(entry.getKey()); - } - } catch (NoSuchAlgorithmException e) { - logger.error(e.getMessage()); - } catch (Exception e) { - logger.error(e.getMessage()); - } - } - //System.out.println("Static task is running..."); - } - - public static void startTask() { - timer = new Timer(); - timer.schedule(new ScheduTask(), 0, 300000); // 每隔300秒执行一次任务 - } - - public static void stopTask() { - if (timer != null) { - timer.cancel(); - timer = null; - } - } -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtil.java deleted file mode 100644 index 9c9aef4..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtil.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.geedgenetworks.core.utils; - -import com.geedgenetworks.common.config.KnowledgeConfig; - -public abstract class knowledgeUtil { - - - abstract Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) throws Exception; - - abstract String functionName(); -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtils.java deleted file mode 100644 index 24dd632..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtils.java +++ /dev/null @@ -1,200 +0,0 @@ -package com.geedgenetworks.core.utils; - -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; -import com.geedgenetworks.common.config.CommonConfig; -import com.geedgenetworks.common.utils.HttpClientUtils; -import com.geedgenetworks.core.pojo.KnowLedgeEntity; -import com.geedgenetworks.utils.StringUtil; -import com.jayway.jsonpath.JsonPath; -import org.apache.http.Header; -import org.apache.http.message.BasicHeader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.InputStream; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.Executor; - -public class knowledgeUtils { - - private static final String KNOWLEDGE_EXPR; - private static Map<String, String> knowLedgeFileSha256 = new HashMap<>(); - private static final Logger logger = LoggerFactory.getLogger(knowledgeUtils.class); - - static { - StringBuilder sb = new StringBuilder(""); - sb.append( - "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined','asn_v4','asn_v6'] && @.id in ['"); - // sb.append(commonConfig.ID_ARRAY); - /* sb.append(commonConfig.IPV4_USER_DEFINED_ID); - sb.append("','"); - sb.append(commonConfig.IPV6_USER_DEFINED_ID); - sb.append("','"); - sb.append(commonConfig.IPV4_BUILT_IN_ID); - sb.append("','"); - sb.append(commonConfig.IPV6_BUILT_IN_ID); - sb.append("','"); - sb.append(commonConfig.ASN_V4_ID); - sb.append("','"); - sb.append(commonConfig.ASN_V6_ID);*/ - sb.append("'])].['id','name','sha256','format','path','type','isValid']"); - KNOWLEDGE_EXPR = sb.toString(); - System.out.println("listening nacos config"); - Properties properties = new Properties(); - properties.put(PropertyKeyConst.SERVER_ADDR, CommonConfig.NACOS_SERVER_ADDR); - properties.setProperty(PropertyKeyConst.USERNAME, CommonConfig.NACOS_USERNAME); - properties.setProperty(PropertyKeyConst.PASSWORD, CommonConfig.NACOS_PASSWORD); - if (!StringUtil.isEmpty(CommonConfig.NACOS_NAMESPACE)) { - properties.setProperty(PropertyKeyConst.NAMESPACE, CommonConfig.NACOS_NAMESPACE); - } - Properties nacosProperties = properties; - String dataId = CommonConfig.NACOS_DATA_ID; - String group = CommonConfig.NACOS_GROUP; - long readTimeout = CommonConfig.NACOS_READ_TIMEOUT; - // 初始化定位库缓存 - logger.info("connect nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR)); - try { - ConfigService configService = NacosFactory.createConfigService(nacosProperties); - String nacosInfo = configService.getConfig(dataId, group, readTimeout); - processNacosInfo(nacosInfo); - - configService.addListener( - dataId, - group, - new Listener() { - @Override - public Executor getExecutor() { - return null; - } - - @Override - public void receiveConfigInfo(String nacosInfo) { - processNacosInfo(nacosInfo); - } - }); - } catch (NacosException e) { - e.printStackTrace(); - } - } - - public static void initKnowledge() {} - - private static void processNacosInfo(String nacosInfo) { - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String currentDate = formatter.format(new Date()); - System.out.println(currentDate + ": receive nacos config:" + nacosInfo); - ArrayList<Map<String, Object>> metaList = JsonPath.parse(nacosInfo).read(KNOWLEDGE_EXPR); - System.out.println( - "resolve " + metaList.size() + " metadata from the config info pushed by nacos"); - loadKnowledge(metaList); - } - - private static void loadKnowledge(ArrayList<Map<String, Object>> metaList) { - Map<String, KnowLedgeEntity> updateMap = new HashMap<>(); - /* - List<String> tagTypes = new ArrayList<>(3); - tagTypes.add("cn_ip_tag_user_defined"); - tagTypes.add("cn_domain_tag_user_defined"); - tagTypes.add("cn_app_tag_user_defined");*/ - - for (Map<String, Object> metadata : metaList) { - - String sha256 = (String) metadata.get("sha256"); - String fileUrl = (String) metadata.get("path"); - String id = (String) metadata.get("id"); - int isValid = (Integer) metadata.get("isValid"); - - if (isValid != 1) { - sha256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; - } - - /* try { - if (!knowLedgeFileSha256.containsKey(id)) { - byte[] downloadBytes = downloadFile(fileUrl, sha256, isValid); - KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity(); - knowLedgeEntity.setDownloadBytes(downloadBytes); - knowLedgeEntity.setIsValid(isValid); - knowLedgeEntity.setSha256(sha256); - knowLedgeFileSha256.put(id, sha256); - updateMap.put(id, knowLedgeEntity); - - } else { - if (!knowLedgeFileSha256.get(id).equals(sha256)) { - byte[] downloadBytes = downloadFile(fileUrl, sha256, isValid); - KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity(); - knowLedgeEntity.setDownloadBytes(downloadBytes); - knowLedgeEntity.setIsValid(isValid); - knowLedgeEntity.setSha256(sha256); - knowLedgeFileSha256.put(id, sha256); - updateMap.put(id, knowLedgeEntity); - - } - - - } - - - - } catch (Exception e) { - System.out.println("process file " + id + " error, error message:" + e.getMessage()); - e.printStackTrace(); - }*/ - } - if (updateMap.size() > 0) { - - // IPUtils.loadIpLook(updateMap); - // csvUtils.loadIpLook(updateMap); - } - } - - private static InputStream downloadFile(String fileUrl) { - InputStream inputStream = null; - byte[] content = new byte[0]; - try { - long start = System.currentTimeMillis(); - Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN); - HttpClientUtils httpClientUtils = new HttpClientUtils(); - inputStream = httpClientUtils.httpGetInputStream(fileUrl, 3000, header); - // content = IOUtils.toByteArray(inputStream); - long end = System.currentTimeMillis(); - System.out.println( - "download file " + fileUrl + " finished, speed " + (end - start) + " ms"); - } catch (Exception e) { - System.out.println("download file " + fileUrl + " error: " + e.getMessage()); - } finally { - // IOUtils.closeQuietly(inputStream); - } - return inputStream; - } - - /* private static byte[] downloadFile(String fileUrl, String sha256, int isValid) { - byte[] content = new byte[0]; - if (isValid != 1) { - return content; - } - String downloadFileSha256; - int downloadCount = 0; - do { - content = downloadFile(fileUrl); - // downloadFileSha256 = DigestUtil.sha256Hex(content); - downloadCount++; - } while (!downloadFileSha256.equals(sha256) && downloadCount < commonConfig.KNOWLEDGE_FILE_CHECK_NUMBER); - if (downloadCount >= commonConfig.KNOWLEDGE_FILE_CHECK_NUMBER) { - System.out.println("warning: file url: " + fileUrl + " download more than specified number of times"); - } - return content; - }*/ - - public static void main(String[] args) throws InterruptedException { - System.out.println(knowLedgeFileSha256); - System.out.println(knowLedgeFileSha256); - while (true) { - Thread.sleep(10000); - } - } -} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java index be1e892..a4b00c1 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java @@ -4,7 +4,7 @@ import com.geedgenetworks.common.config.KnowledgeConfig; import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.UDFContext; import com.geedgenetworks.core.udf.AsnLookup; -import com.geedgenetworks.core.utils.ASNKnowledgeBase; +import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -31,6 +31,8 @@ public class AsnLookupFunctionTest { udfContext.setOutput_fields(Arrays.asList("asn")); } + + @Test public void testInit(){ AsnLookup asnLookup = new AsnLookup(); @@ -68,13 +70,13 @@ public class AsnLookupFunctionTest { knowledgeConfig.setName("tsg_asnlookup"); knowledgeConfig.setType("asnlookup"); knowledgeConfig.setFiles(Arrays.asList("7ce2f9890950ba90-fcc25696bf11a8a0","7ce2f9890950ba90-71f13b3736863ddb")); - ASNKnowledgeBase asnKnowledgeBase =ASNKnowledgeBase.getInstance(); + AsnKnowledgeBase asnKnowledgeBase = AsnKnowledgeBase.getInstance(); asnKnowledgeBase.updateKnowledgeBase(knowledgeConfig); - String asn = ASNKnowledgeBase.getVenderWithAsnLookup() + String asn = AsnKnowledgeBase.getVenderWithAsnLookup() .get("tsg_asnlookup") .asnLookup("2600:1015:b002::"); - assertEquals("6167", asn); + // assertEquals("6167", asn); } diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java new file mode 100644 index 0000000..ab23331 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java @@ -0,0 +1,105 @@ +package com.geedgenetworks.core.udf.test; + +import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.common.exception.GrootStreamRuntimeException; +import com.geedgenetworks.core.pojo.UDFContext; +import com.geedgenetworks.core.udf.GeoIpLookup; +import com.geedgenetworks.core.utils.KnowlegdeBase.IpKnowledgeBase; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class GeoIpLookupFunctionTest { + + private static UDFContext udfContext; + + private static Map<String, Object> parameters ; + + @BeforeAll + public static void setUp() { + udfContext = new UDFContext(); + parameters = new HashMap<>(); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Arrays.asList("ip")); + udfContext.setOutput_fields(Arrays.asList("iplocation")); + } + +//com.geedgenetworks.core.udf.GeoIpLookup单元测试 + @Test + public void testInit(){ + GeoIpLookup geoIpLookup = new GeoIpLookup(); + udfContext.setLookup_fields(new ArrayList<>()); + udfContext.setParameters(new HashMap<>()); + udfContext.setParameters(null); + Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { + geoIpLookup.open(null, udfContext); + }); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("option","IP_TO_CITY"); + Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { + geoIpLookup.open(null, udfContext); + }); + + udfContext.setLookup_fields(new ArrayList<>()); + udfContext.getLookup_fields().add("v1"); + udfContext.setOutput_fields(new ArrayList<>()); + udfContext.getOutput_fields().add("v2"); + udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("option","other"); + Assertions.assertThrows(GrootStreamRuntimeException.class, () -> { + geoIpLookup.open(null, udfContext); + }); + + } + + + + @Test + public void testAsnLookupFunctionIpToAsn() throws Exception { + + /* udfContext.setParameters(new HashMap<>()); + udfContext.getParameters().put("option","IP_TO_ASN"); + udfContext.getParameters().put("vendor_id","tsg_asnlookup");*/ + + KnowledgeConfig knowledgeConfig =new KnowledgeConfig(); + knowledgeConfig.setName("tsg_geoiplookup"); + knowledgeConfig.setType("geoiplookup"); + knowledgeConfig.setFiles(Arrays.asList("acf1db8589c5e277-ead1a65e1c3973dc","acf1db8589c5e277-ead1a65e1c3973dc")); + IpKnowledgeBase ipKnowledgeBase = IpKnowledgeBase.getInstance(); + ipKnowledgeBase.updateKnowledgeBase(knowledgeConfig); + String countryLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").countryLookup("2600:1015:b002::"); + String provinceLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").provinceLookup("2600:1015:b002::"); + String cityLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").cityLookup("2600:1015:b002::"); + String cityLookupDetail = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").cityLookupDetail("2600:1015:b002::"); + String locationLookupDetail = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").locationLookupDetail("2600:1015:b002::"); + String latLngLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").latLngLookup("2600:1015:b002::"); + String ispLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").ispLookup("2600:1015:b002::"); + String infoLookupToJSONString = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookupToJSONString("2600:1015:b002::"); + //String infoLookup = (String) IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookup("2600:1015:b002::"); + + + System.out.println("countryLookup:" + countryLookup); + System.out.println("provinceLookup:" + provinceLookup); + System.out.println("cityLookup:" + cityLookup); + System.out.println("cityLookupDetail:" + cityLookupDetail); + System.out.println("locationLookupDetail:" + locationLookupDetail); + System.out.println("latLngLookup:" + latLngLookup); + System.out.println("ispLookup:" + ispLookup); + System.out.println("infoLookupToJSONString:" + infoLookupToJSONString); + System.out.println("infoLookup:" + IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookup("2600:1015:b002::")); + System.out.println("----------------------------"); + + + // assertEquals("6167", asn); + + } + + +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java new file mode 100644 index 0000000..4edbba1 --- /dev/null +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java @@ -0,0 +1,99 @@ +package com.geedgenetworks.core.udf.test.simple; + +import com.geedgenetworks.core.pojo.Event; +import com.geedgenetworks.core.pojo.UDFContext; +import com.geedgenetworks.core.udf.FromUnixTimestamp; +import com.geedgenetworks.core.udf.UnixTimestampConverter; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class FromUnixTimestampConverterTest { + + private static UDFContext udfContext; + + @BeforeAll + public static void setUp() { + udfContext = new UDFContext(); + udfContext.setLookup_fields(Arrays.asList("input")); + udfContext.setOutput_fields(Arrays.asList("output")); + } + @Test + public void testFromUnixTimestampFunctionMstoS() throws Exception { + + Map<String, Object> parameters = new HashMap<>(); + parameters.put("precision", "seconds"); + udfContext.setParameters(parameters); + UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter(); + unixTimestampConverter.open(null, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("input", 1577808000000L); + event.setExtractedFields(extractedFields); + Event result1 = unixTimestampConverter.evaluate(event); + assertEquals(1577808000L, result1.getExtractedFields().get("output")); + + + } + + @Test + public void testFromUnixTimestampFunctionStoMs() throws Exception { + + Map<String, Object> parameters = new HashMap<>(); + parameters.put("precision", "milliseconds"); + udfContext.setParameters(parameters); + UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter(); + unixTimestampConverter.open(null, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("input", 1577808000L); + event.setExtractedFields(extractedFields); + Event result1 = unixTimestampConverter.evaluate(event); + assertEquals(1577808000000L, result1.getExtractedFields().get("output")); + + + } + + + @Test + public void testFromUnixTimestampFunctionStoS() throws Exception { + + Map<String, Object> parameters = new HashMap<>(); + parameters.put("precision", "seconds"); + udfContext.setParameters(parameters); + UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter(); + unixTimestampConverter.open(null, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("input", 1577808000L); + event.setExtractedFields(extractedFields); + Event result1 = unixTimestampConverter.evaluate(event); + assertEquals(1577808000L, result1.getExtractedFields().get("output")); + + + } + + + @Test + public void testFromUnixTimestampFunctionMstoMs() throws Exception { + + Map<String, Object> parameters = new HashMap<>(); + parameters.put("precision", "milliseconds"); + udfContext.setParameters(parameters); + UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter(); + unixTimestampConverter.open(null, udfContext); + Event event = new Event(); + Map<String, Object> extractedFields = new HashMap<>(); + extractedFields.put("input", 1577808000000L); + event.setExtractedFields(extractedFields); + Event result1 = unixTimestampConverter.evaluate(event); + assertEquals(1577808000000L, result1.getExtractedFields().get("output")); + + + } +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/ReNameFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/ReNameFunctionTest.java index e2e6d66..d55d2a6 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/ReNameFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/ReNameFunctionTest.java @@ -3,9 +3,6 @@ package com.geedgenetworks.core.udf.test.simple; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.UDFContext; import com.geedgenetworks.core.udf.Rename; -import com.geedgenetworks.core.utils.ASNKnowledgeBase; -import com.geedgenetworks.core.utils.KnowledgeBaseSchedule; -import com.geedgenetworks.utils.IpLookupV2; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; |
