summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2023-12-08 19:07:58 +0800
committerwangkuan <[email protected]>2023-12-08 19:07:58 +0800
commit4fc6b233b2ef82f22ce362c56b3656909f579c38 (patch)
treec78bb770edd8dcab2624f57834e45a52045817ca
parent770273ae65d55dd84e18e546ecd45aa8040464f8 (diff)
[improve][core][common]部分命名问题,新增部分函数
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java10
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java42
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java45
-rw-r--r--groot-common/src/main/resources/groot-platform-knowledge-base-plugin3
-rw-r--r--groot-common/src/main/resources/groot-platform-plugin2
-rw-r--r--groot-common/src/main/resources/grootstream.yaml4
-rw-r--r--groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java1
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java9
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Combine.java70
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/udf/IpLocation.java)118
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java86
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java84
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/ASNKnowledgeBase.java156
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/IPKnowledgeBase.java104
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/IpLookupUtils.java246
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java78
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java106
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/IpKnowledgeBase.java64
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java (renamed from groot-core/src/main/java/com/geedgenetworks/core/utils/KnowledgeBaseSchedule.java)84
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/ScheduTask.java48
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtil.java11
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtils.java200
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java10
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java105
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java99
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/ReNameFunctionTest.java3
27 files changed, 805 insertions, 985 deletions
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java
index a5ac06e..3591ce6 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java
@@ -13,23 +13,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
@Data
public class EngineConfig implements Serializable {
- private int tickTupleFreqSecs = ServerConfigOptions.TICK_TUPLE_FREQ_SECS.defaultValue();
- private int gtpcScanMaxRows = ServerConfigOptions.GTPC_SCAN_MAX_ROWS.defaultValue();
- private int radiusScanMaxRows = ServerConfigOptions.RADIUS_SCAN_MAX_ROWS.defaultValue();
- private String radiusTableName = ServerConfigOptions.RADIUS_TABLE_NAME.defaultValue();
- private String gtpcTableName = ServerConfigOptions.GTPC_TABLE_NAME.defaultValue();
- private int hbaseRpcTimeout = ServerConfigOptions.HBASE_RPC_TIMEOUT.defaultValue();
+
private HttpConPoolConfig httpConPoolConfig = ServerConfigOptions.HTTP_CON_POOL.defaultValue();
private List<KnowledgeConfig> knowledgeBaseConfig = ServerConfigOptions.KNOWLEDGE_BASE.defaultValue();
- private SnowflakeConfig snowflakeConfig = ServerConfigOptions.SNOWFLAKE.defaultValue();
private NacosConfig nacosConfig = ServerConfigOptions.NACOS.defaultValue();
private ConsulConfig consulConfig = ServerConfigOptions.CONSUL.defaultValue();
- private HosConfig hosConfig = ServerConfigOptions.HOS.defaultValue();
private ZookeeperConfig zookeeperConfig = ServerConfigOptions.ZOOKEEPER.defaultValue();
private HdfsConfig hdfsConfig = ServerConfigOptions.HDFS.defaultValue();
-
public void setKnowledgeBaseConfig(List<KnowledgeConfig> knowledgeBaseConfig) {
checkNotNull(knowledgeBaseConfig, ServerConfigOptions.KNOWLEDGE_BASE + "knowledgeConfig should not be null");
this.knowledgeBaseConfig = knowledgeBaseConfig;
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java
index 4cce3a0..04bc7ec 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java
@@ -9,41 +9,6 @@ import java.util.Map;
public class ServerConfigOptions {
- public static final Option<Integer> TICK_TUPLE_FREQ_SECS =
- Options.key("tick_tuple_freq_secs")
- .intType()
- .defaultValue(60)
- .withDescription("The frequency of tick tuple (in seconds).");
-
- public static final Option<Integer> GTPC_SCAN_MAX_ROWS =
- Options.key("gtpc_scan_max_rows")
- .intType()
- .defaultValue(10000)
- .withDescription("The max rows of gtpc scan.");
-
- public static final Option<Integer> RADIUS_SCAN_MAX_ROWS =
- Options.key("radius_scan_max_rows")
- .intType()
- .defaultValue(10000)
- .withDescription("The max rows of radius scan.");
-
- public static final Option<String> RADIUS_TABLE_NAME =
- Options.key("radius_table_name")
- .stringType()
- .defaultValue("RADIUS-TABLE")
- .withDescription("The table name of radius.");
-
- public static final Option<String> GTPC_TABLE_NAME =
- Options.key("gtpc_table_name")
- .stringType()
- .defaultValue("GTPC-TABLE")
- .withDescription("The table name of gtpc.");
-
- public static final Option<Integer> HBASE_RPC_TIMEOUT =
- Options.key("hbase_rpc_timeout")
- .intType()
- .defaultValue(60000)
- .withDescription("The rpc timeout of hbase.");
public static final Option<Integer> HTTP_CON_POOL_MAX_TOTAL =
Options.key("max_total")
@@ -101,10 +66,6 @@ public class ServerConfigOptions {
.defaultValue(new ArrayList<>())
.withDescription("The files of knowledgebase.");
- public static final Option<List<KnowledgeConfig>> KNOWLEDGE_BASE_ASN_LOOKUP = Options.key("asnlookup")
- .type(new TypeReference<List<KnowledgeConfig>>() {})
- .noDefaultValue()
- .withDescription("The asn lookup configuration.");
public static final Option<String> KNOWLEDGE_BASE_STORAGE_FS_TYPE = Options.key("fs_type")
.stringType()
@@ -116,9 +77,6 @@ public class ServerConfigOptions {
.defaultValue("")
.withDescription("The default path of knowledge base storage.");
- public static final Option<StorageConfig> KNOWLEDGE_BASE_STORAGE = Options.key("storage").type(new TypeReference<StorageConfig>() {})
- .defaultValue(new StorageConfig())
- .withDescription("The storage configuration of knowledge base.");
public static final Option<List<KnowledgeConfig>> KNOWLEDGE_BASE =
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java
index 558bf69..ccdd58a 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java
@@ -53,30 +53,14 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso
final EngineConfig engineConfig = config.getEngineConfig();
for (Node node : childElements(engineNode)) {
String name = cleanNodeName(node);
- if (ServerConfigOptions.TICK_TUPLE_FREQ_SECS.key().equals(name)) {
- engineConfig.setTickTupleFreqSecs(getIntegerValue(ServerConfigOptions.TICK_TUPLE_FREQ_SECS.key(), getTextContent(node)));
- } else if (ServerConfigOptions.GTPC_SCAN_MAX_ROWS.key().equals(name)) {
- engineConfig.setGtpcScanMaxRows(getIntegerValue(ServerConfigOptions.GTPC_SCAN_MAX_ROWS.key(), getTextContent(node)));
- } else if (ServerConfigOptions.RADIUS_SCAN_MAX_ROWS.key().equals(name)) {
- engineConfig.setRadiusScanMaxRows(getIntegerValue(ServerConfigOptions.RADIUS_SCAN_MAX_ROWS.key(), getTextContent(node)));
- } else if (ServerConfigOptions.RADIUS_TABLE_NAME.key().equals(name)) {
- engineConfig.setRadiusTableName(getTextContent(node));
- } else if (ServerConfigOptions.GTPC_TABLE_NAME.key().equals(name)) {
- engineConfig.setGtpcTableName(getTextContent(node));
- } else if (ServerConfigOptions.HBASE_RPC_TIMEOUT.key().equals(name)) {
- engineConfig.setHbaseRpcTimeout(getIntegerValue(ServerConfigOptions.HBASE_RPC_TIMEOUT.key(), getTextContent(node)));
- } else if (ServerConfigOptions.HTTP_CON_POOL.key().equals(name)) {
+ if (ServerConfigOptions.HTTP_CON_POOL.key().equals(name)) {
engineConfig.setHttpConPoolConfig(parseHttpConPoolConfig(node));
} else if (ServerConfigOptions. KNOWLEDGE_BASE.key().equals(name)) {
engineConfig.setKnowledgeBaseConfig(parseKnowledgeBaseConfig(node));
- } else if (ServerConfigOptions.SNOWFLAKE.key().equals(name)) {
- engineConfig.setSnowflakeConfig(parseSnowflakeConfig(node));
} else if (ServerConfigOptions.NACOS.key().equals(name)) {
engineConfig.setNacosConfig(parseNacosConfig(node));
} else if (ServerConfigOptions.CONSUL.key().equals(name)) {
engineConfig.setConsulConfig(parseConsulConfig(node));
- } else if (ServerConfigOptions.HOS.key().equals(name)) {
- engineConfig.setHosConfig(parseHosConfig(node));
} else if (ServerConfigOptions.ZOOKEEPER.key().equals(name)) {
engineConfig.setZookeeperConfig(parseZookeeperConfig(node));
} else if (ServerConfigOptions.HDFS.key().equals(name)) {
@@ -89,18 +73,6 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso
}
}
- private HosConfig parseHosConfig(Node hosNode) {
- HosConfig hosConfig = new HosConfig();
- for (Node node : childElements(hosNode)) {
- String name = cleanNodeName(node);
- if (ServerConfigOptions.HOS_TOKEN.key().equals(name)) {
- hosConfig.setToken(getTextContent(node));
- } else {
- LOGGER.warning("Unrecognized configuration element: " + name);
- }
- }
- return hosConfig;
- }
private ZookeeperConfig parseZookeeperConfig(Node zookeeperNode) {
ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
@@ -170,18 +142,7 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso
return nacosConfig;
}
- private SnowflakeConfig parseSnowflakeConfig(Node snowflakeNode) {
- SnowflakeConfig snowflakeConfig = new SnowflakeConfig();
- for (Node node : childElements(snowflakeNode)) {
- String name = cleanNodeName(node);
- if (ServerConfigOptions.SNOWFLAKE_DATA_CENTER_ID.key().equals(name)) {
- snowflakeConfig.setDataCenterId(getTextContent(node));
- } else {
- LOGGER.warning("Unrecognized configuration element: " + name);
- }
- }
- return snowflakeConfig;
- }
+
private HttpConPoolConfig parseHttpConPoolConfig(Node httpConPoolNode) {
HttpConPoolConfig httpConPoolConfig = new HttpConPoolConfig();
for (Node node : childElements(httpConPoolNode)) {
@@ -213,8 +174,6 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso
return knowledgeConfigList;
}
-
-
private KnowledgeConfig parseKnowledgeConfig(Node asnLookupNode) {
KnowledgeConfig knowledgeConfig = new KnowledgeConfig();
for (Node node : childElements(asnLookupNode)) {
diff --git a/groot-common/src/main/resources/groot-platform-knowledge-base-plugin b/groot-common/src/main/resources/groot-platform-knowledge-base-plugin
index b3f1204..c8a3f7c 100644
--- a/groot-common/src/main/resources/groot-platform-knowledge-base-plugin
+++ b/groot-common/src/main/resources/groot-platform-knowledge-base-plugin
@@ -1 +1,2 @@
-com.geedgenetworks.core.utils.ASNKnowledgeBase \ No newline at end of file
+com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase
+com.geedgenetworks.core.utils.KnowlegdeBase.IpKnowledgeBase \ No newline at end of file
diff --git a/groot-common/src/main/resources/groot-platform-plugin b/groot-common/src/main/resources/groot-platform-plugin
index 041a20c..bebf1e9 100644
--- a/groot-common/src/main/resources/groot-platform-plugin
+++ b/groot-common/src/main/resources/groot-platform-plugin
@@ -6,4 +6,4 @@ com.geedgenetworks.core.udf.JsonExtract
com.geedgenetworks.core.udf.UnixTimestamp
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.DecodeBase64
-com.geedgenetworks.core.udf.IpLocation \ No newline at end of file
+com.geedgenetworks.core.udf.GeoIpLookup \ No newline at end of file
diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml
index 153529a..d0906bc 100644
--- a/groot-common/src/main/resources/grootstream.yaml
+++ b/groot-common/src/main/resources/grootstream.yaml
@@ -21,8 +21,8 @@ grootstream:
files:
default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0
user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb
- - name: cn_asnlookup
- type: asnlookup
+ - name: tsg_geoiplookup
+ type: geoiplookup
files:
default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0
user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb
diff --git a/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java b/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java
index c9634c8..f6f27de 100644
--- a/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java
+++ b/groot-common/src/test/java/com/geedgenetworks/common/config/YamlGrootStreamConfigParserTest.java
@@ -16,7 +16,6 @@ public class YamlGrootStreamConfigParserTest {
throw new RuntimeException("can't find yaml in resources");
}
Assertions.assertNotNull(config);
- Assertions.assertEquals(90, config.getEngineConfig().getTickTupleFreqSecs());
Assertions.assertNotNull(config.getEngineConfig().getNacosConfig().getServerAddr());
Assertions.assertTrue(config.getEngineConfig().getHttpConPoolConfig().getSocketTimeout() > 0);
Assertions.assertNotNull(config.getEngineConfig().getKnowledgeBaseConfig());
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index 22abbfc..a81bbd3 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -10,7 +10,7 @@ import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.udf.UDF;
-import com.geedgenetworks.core.utils.KnowledgeBaseSchedule;
+import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.AviatorEvaluatorInstance;
import com.googlecode.aviator.Expression;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
index 8c0ce93..14897ff 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
@@ -8,11 +8,11 @@ import com.geedgenetworks.common.exception.CommonErrorCode;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
-import com.geedgenetworks.core.utils.ASNKnowledgeBase;
+import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.geedgenetworks.core.utils.KnowledgeBaseSchedule;
+import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
@@ -37,6 +37,9 @@ public class AsnLookup implements UDF {
if(vender.equals(knowledgeConfig.getName())){
KnowledgeBaseSchedule.initKnowledgeBase(knowledgeConfig);
}
+ else {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param vendor_id value is not correct");
+ }
}
}
@@ -51,7 +54,7 @@ public class AsnLookup implements UDF {
event.getExtractedFields()
.put(
udfContext.getOutput_fields().get(0),
- ASNKnowledgeBase.getVenderWithAsnLookup()
+ AsnKnowledgeBase.getVenderWithAsnLookup()
.get(vender)
.asnLookup(
event.getExtractedFields()
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Combine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Combine.java
deleted file mode 100644
index 2080768..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Combine.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.geedgenetworks.core.udf;
-
-import com.geedgenetworks.core.pojo.Event;
-import com.geedgenetworks.core.pojo.UDFContext;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Map;
-
-public class Combine implements UDF {
-
- private UDFContext udfContext;
-
- private StringBuilder stringBuilder;
-
-
- @Override
- public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
-
- this.udfContext = udfContext;
- if (udfContext.getParameters() != null
- && udfContext.getParameters().size() != 0) {
- ArrayList<String> keys = new ArrayList<>();
- for (Map.Entry<String, Object> entry : udfContext.getParameters().entrySet()) {
- String key = entry.getKey();
- keys.add(key);
- }
- keys.sort(new NaturalNumberStringComparator());
- stringBuilder = new StringBuilder();
- for (String key : keys) {
- stringBuilder.append(udfContext.getParameters().get(key));
- }
- }
- }
-
- @Override
- public Event evaluate(Event event) {
-
- if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
- stringBuilder.append(
- event.getExtractedFields().get(udfContext.getLookup_fields().get(0)));
- event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), stringBuilder.toString());
- }
- return event;
- }
-
- @Override
- public String functionName() {
- return "COMBINE";
- }
-
- @Override
- public void close() {
-
- }
-
- static class NaturalNumberStringComparator implements Comparator<String> {
- @Override
- public int compare(String s1, String s2) {
- String[] s1Array = s1.split("path");
- String[] s2Array = s2.split("path");
- Integer n1 = Integer.parseInt(s1Array[1]);
- Integer n2 = Integer.parseInt(s2Array[1]);
- return n1.compareTo(n2);
- }
- }
-
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/IpLocation.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
index 880862a..4f09969 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/IpLocation.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
@@ -1,8 +1,14 @@
package com.geedgenetworks.core.udf;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.EngineConfig;
+import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
-import com.geedgenetworks.core.utils.IPKnowledgeBase;
+import com.geedgenetworks.core.utils.KnowlegdeBase.IpKnowledgeBase;
+import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule;
import com.geedgenetworks.utils.StringUtil;
import cn.hutool.log.Log;
@@ -10,10 +16,11 @@ import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
import java.util.HashMap;
-public class IpLocation implements UDF {
+public class GeoIpLookup implements UDF {
private UDFContext udfContext;
private static final Log logger = LogFactory.get();
@@ -22,12 +29,25 @@ public class IpLocation implements UDF {
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+ checkUdfContext(udfContext);
this.udfContext = udfContext;
- this.vender = udfContext.getParameters().get("kb_vender").toString();
- // this.option = udfContext.getParameters().get("option").toString();
- this.option ="IP_TO_COUNTRY";
+ this.vender = udfContext.getParameters().get("vendor_id").toString();
+ this.option = udfContext.getParameters().get("option").toString();
+ Configuration configuration = (Configuration) runtimeContext
+ .getExecutionConfig().getGlobalJobParameters();
+ EngineConfig engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class);
+ for(KnowledgeConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){
+ if(vender.equals(knowledgeConfig.getName())){
+ KnowledgeBaseSchedule.initKnowledgeBase(knowledgeConfig);
+ }
+ else {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param vendor_id value is not correct");
+ }
+ }
}
+
+
@Override
public Event evaluate(Event event) {
if (event.getExtractedFields().containsKey(udfContext.getLookup_fields().get(0))) {
@@ -37,7 +57,7 @@ public class IpLocation implements UDF {
event.getExtractedFields()
.put(
udfContext.getOutput_fields().get(0),
- IPKnowledgeBase.venderWithIpLookup
+ IpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.countryLookup(
event.getExtractedFields()
@@ -51,7 +71,7 @@ public class IpLocation implements UDF {
event.getExtractedFields()
.put(
udfContext.getOutput_fields().get(0),
- IPKnowledgeBase.venderWithIpLookup
+ IpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.provinceLookup(
event.getExtractedFields()
@@ -65,7 +85,7 @@ public class IpLocation implements UDF {
event.getExtractedFields()
.put(
udfContext.getOutput_fields().get(0),
- IPKnowledgeBase.venderWithIpLookup
+ IpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.cityLookup(
event.getExtractedFields()
@@ -79,7 +99,7 @@ public class IpLocation implements UDF {
event.getExtractedFields()
.put(
udfContext.getOutput_fields().get(0),
- IPKnowledgeBase.venderWithIpLookup
+ IpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.cityLookupDetail(
event.getExtractedFields()
@@ -93,7 +113,7 @@ public class IpLocation implements UDF {
event.getExtractedFields()
.put(
udfContext.getOutput_fields().get(0),
- IPKnowledgeBase.venderWithIpLookup
+ IpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.locationLookupDetail(
event.getExtractedFields()
@@ -103,9 +123,9 @@ public class IpLocation implements UDF {
.get(0))
.toString()));
break;
- case "IP_TO_GEO":
+ case "IP_TO_LATLNG":
String geo =
- IPKnowledgeBase.venderWithIpLookup
+ IpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
.latLngLookup(
event.getExtractedFields()
@@ -125,9 +145,9 @@ public class IpLocation implements UDF {
case "IP_TO_PROVIDER":
HashMap<String, Object> serverIpMap =
JSON.parseObject(
- IPKnowledgeBase.venderWithIpLookup
+ IpKnowledgeBase.getVenderWithIpLookup()
.get(vender)
- .asnLookupDetail(
+ .ispLookup(
event.getExtractedFields()
.get(
udfContext
@@ -141,6 +161,34 @@ public class IpLocation implements UDF {
udfContext.getOutput_fields().get(0),
serverIpMap.getOrDefault("isp", StringUtil.EMPTY));
break;
+ case "IP_TO_JSON ":
+ event.getExtractedFields()
+ .put(
+ udfContext.getOutput_fields().get(0),
+ IpKnowledgeBase.getVenderWithIpLookup()
+ .get(vender)
+ .infoLookupToJSONString(
+ event.getExtractedFields()
+ .get(
+ udfContext
+ .getLookup_fields()
+ .get(0))
+ .toString()));
+ break;
+ case "IP_TO_OBJECT":
+ event.getExtractedFields()
+ .put(
+ udfContext.getOutput_fields().get(0),
+ IpKnowledgeBase.getVenderWithIpLookup()
+ .get(vender)
+ .infoLookup(
+ event.getExtractedFields()
+ .get(
+ udfContext
+ .getLookup_fields()
+ .get(0))
+ .toString()));
+ break;
default:
break;
}
@@ -148,6 +196,48 @@ public class IpLocation implements UDF {
return event;
}
+ private void checkUdfContext(UDFContext udfContext) {
+ if(udfContext.getLookup_fields().size() != 1){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function lookup fields only support 1 value");
+ }
+ if(udfContext.getOutput_fields().size() != 1){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
+ }
+ if(udfContext.getParameters() == null){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function needs parameters");
+ }
+ if(!udfContext.getParameters().containsKey("vendor_id")){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey vendor_id");
+ }
+ if(!udfContext.getParameters().containsKey("option")){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey option");
+ }
+ else{
+ if(!udfContext.getParameters().get("option").toString().equals("IP_TO_COUNTRY") && //IP_TO_COUNTRY
+ !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVINCE") && //IP_TO_PROVINCE
+ !udfContext.getParameters().get("option").toString().equals("IP_TO_CITY") && //IP_TO_CITY
+ !udfContext.getParameters().get("option").toString().equals("IP_TO_SUBDIVISION_ADDR") && //IP_TO_SUBDIVISION_ADDR
+ !udfContext.getParameters().get("option").toString().equals("IP_TO_DETAIL") && //IP_TO_DETAIL
+ !udfContext.getParameters().get("option").toString().equals("IP_TO_LATLNG") && //IP_TO_LATLNG
+ !udfContext.getParameters().get("option").toString().equals("IP_TO_PROVIDER") && //IP_TO_PROVIDER
+ !udfContext.getParameters().get("option").toString().equals("IP_TO_JSON") && //IP_TO_JSON
+ !udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT") ){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct");
+ }
+ if(udfContext.getParameters().get("option").toString().equals("IP_TO_OBJECT")){
+
+ if(!udfContext.getParameters().containsKey("geolocation_field_mapping")){
+
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey geolocation_field_mapping");
+
+ }
+
+ }
+ }
+
+ }
+
+
@Override
public String functionName() {
return "IP_LOCATION";
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
new file mode 100644
index 0000000..6af2741
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -0,0 +1,86 @@
+package com.geedgenetworks.core.udf;
+
+import com.alibaba.fastjson.JSON;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.EngineConfig;
+import com.geedgenetworks.core.pojo.Event;
+import com.geedgenetworks.core.pojo.UDFContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.*;
+
+public class PathCombine implements UDF {
+
+ private UDFContext udfContext;
+
+ private StringBuilder stringBuilder;
+
+ private Map<String, String> pathParameters = new LinkedHashMap<>();
+
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+
+ this.udfContext = udfContext;
+/*
+ Configuration configuration = (Configuration) runtimeContext
+ .getExecutionConfig().getGlobalJobParameters();
+ EngineConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class);
+*/
+
+ if (udfContext.getParameters() != null
+ && !udfContext.getParameters().isEmpty()) {
+
+ Object object = udfContext.getParameters().get("path");
+ ArrayList<Object> arrayList = new ArrayList<>(Arrays.asList(object));
+ for (Object key : arrayList) {
+ String column =key.toString();
+ if(column.contains("global")){
+ pathParameters.put(column,"");//待定义全局变量
+ }
+ else {
+ pathParameters.put(column,"dynamic");
+ }
+ }
+
+ ArrayList<String> keys = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : udfContext.getParameters().entrySet()) {
+ String key = entry.getKey();
+ keys.add(key);
+ }
+ stringBuilder = new StringBuilder();
+ for (String key : keys) {
+ stringBuilder.append(udfContext.getParameters().get(key));
+ }
+ }
+ }
+
+
+
+ @Override
+ public Event evaluate(Event event) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (Map.Entry<String, String> entry : pathParameters.entrySet()) {
+ if (entry.getValue().equals("dynamic")) {
+ stringBuilder.append(event.getExtractedFields().getOrDefault(entry.getKey(),"").toString());
+ }
+ else {
+ stringBuilder.append(entry.getValue());
+ }
+ }
+ event.getExtractedFields().put(udfContext.getOutput_fields().get(0), stringBuilder.toString());
+ return event;
+ }
+
+ @Override
+ public String functionName() {
+ return "PATH_COMBINE";
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
new file mode 100644
index 0000000..c0ad7f1
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
@@ -0,0 +1,84 @@
+package com.geedgenetworks.core.udf;
+
+import com.geedgenetworks.common.exception.CommonErrorCode;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.core.pojo.Event;
+import com.geedgenetworks.core.pojo.UDFContext;
+import com.geedgenetworks.utils.FormatUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.time.Instant;
+
+@Slf4j
+public class UnixTimestampConverter implements UDF {
+
+ private UDFContext udfContext;
+
+ private String precision;
+ @Override
+ public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
+
+ this.udfContext = udfContext;
+
+ if(udfContext.getOutput_fields().size() != 1){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function output fields only support 1 value");
+ }
+ if(udfContext.getParameters() == null){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The function must contain parameters");
+ }
+ if(!udfContext.getParameters().containsKey("precision")){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters must containkey precision");
+ }
+ else{
+ if(!udfContext.getParameters().get("precision").toString().equals("seconds") &&
+ !udfContext.getParameters().get("precision").toString().equals("milliseconds")){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct");
+ }else {
+
+ this.precision =udfContext.getParameters().get("precision").toString();
+ }
+ }
+
+
+ }
+
+
+ @Override
+ public Event evaluate(Event event) {
+
+ Long timestamp = Long.parseLong(event.getExtractedFields().get(udfContext.getLookup_fields().get(0)).toString());
+ Instant instant = null;
+ if (String.valueOf(timestamp).length() ==13 ) {
+ // 时间戳长度大于10,表示为毫秒级时间戳
+ instant = Instant.ofEpochMilli(timestamp);
+ } else if(String.valueOf(timestamp).length() ==10){
+ // 时间戳长度小于等于10,表示为秒级时间戳
+ instant = Instant.ofEpochSecond(timestamp);
+ }else {
+ return event;
+ }
+ switch (precision) {
+ case "seconds":
+ timestamp = instant.getEpochSecond();
+ break;
+ case "milliseconds":
+ timestamp = instant.toEpochMilli();
+ break;
+ }
+ event.getExtractedFields().put(udfContext.getOutput_fields().get(0), timestamp);
+ return event;
+
+ }
+
+
+ @Override
+ public String functionName() {
+ return "UNIX_TIMESTAMP_FUNCTION";
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/ASNKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/ASNKnowledgeBase.java
deleted file mode 100644
index d906949..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/ASNKnowledgeBase.java
+++ /dev/null
@@ -1,156 +0,0 @@
-package com.geedgenetworks.core.utils;
-
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.TypeReference;
-import com.geedgenetworks.common.config.KnowledgeConfig;
-
-import com.geedgenetworks.common.utils.HttpClientUtils;
-import com.geedgenetworks.core.pojo.KnowLedgeEntity;
-import com.geedgenetworks.utils.IpLookupV2;
-import lombok.Getter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.net.URI;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.*;
-
-import static scala.tools.fusesource_embedded.jansi.AnsiRenderer.test;
-
-
-public class ASNKnowledgeBase extends knowledgeUtil {
-
-
- private static final Logger LOG = LoggerFactory.getLogger(ASNKnowledgeBase.class);
- @Getter
- private static Map<String, IpLookupV2> venderWithAsnLookup = new HashMap<>();
- private static final Logger logger = LoggerFactory.getLogger(ASNKnowledgeBase.class);
- private static ASNKnowledgeBase instance;
- // 私有构造函数,防止外部实例化
-
- public static synchronized ASNKnowledgeBase getInstance() throws Exception {
- if (instance == null) {
- instance = new ASNKnowledgeBase();
- }
- return instance;
- }
-
-
- public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) throws Exception {
-
- IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false);
-
- for (int i = 0; i < knowledgeConfig.getFiles().size(); i++) {
-
- KnowLedgeEntity knowLedgeEntity = getMetadata(knowledgeConfig.getFiles().get(i));
-
- byte[] fileByte = downloadFile(knowLedgeEntity.getPath(), knowLedgeEntity.getIsValid());
-
- if (calculateSHA256(fileByte).equals(knowLedgeEntity.getSha256())) {
- switch (i) {
- case 0:
- asnLookupBuilder.loadAsnDataFile(new ByteArrayInputStream(fileByte));
-
- break;
- case 1:
- asnLookupBuilder.loadDataFilePrivate(new ByteArrayInputStream(fileByte));
-
- break;
- }
- } else {
- logger.error("check file sha256 error ");
- return false;
- }
- }
- IpLookupV2 ipLookup = asnLookupBuilder.build();
- venderWithAsnLookup.put(knowledgeConfig.getName(), ipLookup);
- return true;
- }
-
- @Override
- public String functionName() {
- return "asnlookup";
- }
-
-
- public static String calculateSHA256(byte[] data) throws NoSuchAlgorithmException {
- MessageDigest digest = MessageDigest.getInstance("SHA-256");
-
- StringBuilder result = new StringBuilder();
- for (byte b : digest.digest(data)) {
- result.append(String.format("%02x", b));
- }
- return result.toString();
- }
-
-
- private static KnowLedgeEntity getMetadata(String id) {
-
- try {
-// HttpClientUtils httpClientUtils = new HttpClientUtils();
- // String metadate = httpClientUtils.httpGet(URI.create("/v1/knowledge_base/"+ id +"/meta")) ;
-//临时写死
- KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity();
- knowLedgeEntity.setIsValid(1);
- knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c");
- knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb");
- // List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {});
- // return knowLedgeEntityList.get(0);
- return knowLedgeEntity;
- } catch (Exception e) {
- logger.error("get file Metadata " + id + " error: " + e.getMessage());
- }
-
- return null;
- }
-
- private static byte[] downloadFile(String fileUrl, int isValid) {
- byte[] inputStream = null;
- if (isValid == 1) {
- try {
- long start = System.currentTimeMillis();
- HttpClientUtils httpClientUtils = new HttpClientUtils();
- inputStream = httpClientUtils.httpGetByte(fileUrl, 3000);
- long end = System.currentTimeMillis();
- logger.info(
- "download file " + fileUrl + " finished, speed " + (end - start) + " ms");
- } catch (Exception e) {
- logger.error("download file " + fileUrl + " error: " + e.getMessage());
- }
- }
- return inputStream;
- }
-
- private static void test(String ip) {
-
- IpLookupV2 ipLookup = venderWithAsnLookup.get("TSG");
- System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip));
- System.out.println("cityLookup:" + ipLookup.cityLookup(ip));
- System.out.println("countryLookup:" + ipLookup.countryLookup(ip));
- System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip));
- System.out.println("provinceLookup:" + ipLookup.provinceLookup(ip));
- System.out.println("----------------------------");
-
- long start1 = System.currentTimeMillis();
- System.out.println(
- "administrativeAreaLookupDetail:" + ipLookup.administrativeAreaLookupDetail(ip));
- System.out.println("asnLookupDetail:" + ipLookup.asnLookupDetail(ip));
- System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip));
- System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip));
- System.out.println("ispLookup:" + ipLookup.ispLookup(ip));
- System.out.println("cityLatLngLookup:" + ipLookup.cityLatLngLookup(ip));
- long end1 = System.currentTimeMillis();
- System.out.println(end1 - start1);
-
- System.out.println("--------------------------------------");
- System.out.println("infoLookupToCSV:" + ipLookup.infoLookupToCSV(ip));
- System.out.println("infoLookupToJson:" + ipLookup.ispLookup(ip));
- System.out.println("----------------------------");
- }
-
-
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/IPKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/IPKnowledgeBase.java
deleted file mode 100644
index 4f7aabf..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/IPKnowledgeBase.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package com.geedgenetworks.core.utils;
-
-
-import com.geedgenetworks.core.pojo.KnowLedgeEntity;
-import com.geedgenetworks.utils.IpLookupV2;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.*;
-
-
-public class IPKnowledgeBase {
-
- public static Map<String, IpLookupV2> venderWithIpLookup = new HashMap<>();
- private static final Logger logger = LoggerFactory.getLogger(IPKnowledgeBase.class);
-
-
-
- private static IPKnowledgeBase instance;
-
- // 私有构造函数,防止外部实例化
- public IPKnowledgeBase() {
-
- }
-
- public static synchronized IPKnowledgeBase getInstance(Map<String, KnowLedgeEntity> map) throws Exception {
- if (instance == null) {
- updateKnowledgeBase(map);
- instance = new IPKnowledgeBase( );
- }
- return instance;
- }
-
- public static void updateKnowledgeBase(Map<String, KnowLedgeEntity> map) throws Exception {
-
- /* for (Map.Entry<String, KnowLedgeEntity> entry : map.entrySet()) {
- IpLookupV2.Builder ipLookupBuilder = new IpLookupV2.Builder(false);
-
- try {
- if ("ip_v4_built_in_id".equals(entry.getKey())) {
- ipLookupBuilder.loadDataFileV4(
- entry.getValue().getInputStream());
- }
- if ("ip_v6_built_in_id".equals(entry.getKey())) {
- ipLookupBuilder.loadDataFileV6(
- entry.getValue().getInputStream());
- }
- if ("ip_v4_user_defined_id".equals(entry.getKey())) {
- ipLookupBuilder.loadDataFilePrivateV4(
- entry.getValue().getInputStream());
- }
- if ("ip_v6_user_defined_id".equals(entry.getKey())) {
- ipLookupBuilder.loadDataFilePrivateV6(
- entry.getValue().getInputStream());
- }
- } catch (Exception e) {
- logger.error(
- "加载ip知识库"
- + entry.getKey()
- + ":"
- + entry.getValue()
- + "失败 "
- + e.toString());
-
-
- }
- IpLookupV2 ipLookup = ipLookupBuilder.build();
- venderWithIpLookup.put(entry.getKey(), ipLookup);
- }*/
-
- }
-
- public static void main(String[] args) throws Exception {
- String ip = "114.64.231.114";
-
- test(ip);
- }
-
- private static void test(String ip) {
-
- IpLookupV2 ipLookup = venderWithIpLookup.get("TSG");
- System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip));
- System.out.println("cityLookup:" + ipLookup.cityLookup(ip));
- System.out.println("countryLookup:" + ipLookup.countryLookup(ip));
- System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip));
- System.out.println("provinceLookup:" + ipLookup.provinceLookup(ip));
- System.out.println("----------------------------");
-
- long start1 = System.currentTimeMillis();
- System.out.println(
- "administrativeAreaLookupDetail:" + ipLookup.administrativeAreaLookupDetail(ip));
- System.out.println("asnLookupDetail:" + ipLookup.asnLookupDetail(ip));
- System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip));
- System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip));
- System.out.println("ispLookup:" + ipLookup.ispLookup(ip));
- System.out.println("cityLatLngLookup:" + ipLookup.cityLatLngLookup(ip));
- long end1 = System.currentTimeMillis();
- System.out.println(end1 - start1);
-
- System.out.println("--------------------------------------");
- System.out.println("infoLookupToCSV:" + ipLookup.infoLookupToCSV(ip));
- System.out.println("infoLookupToJson:" + ipLookup.ispLookup(ip));
- System.out.println("----------------------------");
- }
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/IpLookupUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/IpLookupUtils.java
deleted file mode 100644
index 06a6e48..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/IpLookupUtils.java
+++ /dev/null
@@ -1,246 +0,0 @@
-package com.geedgenetworks.core.utils;
-
-import com.geedgenetworks.common.config.CommonConfig;
-
-import com.geedgenetworks.utils.IpLookupV2;
-import com.geedgenetworks.utils.StringUtil;
-import org.apache.commons.io.IOUtils;
-
-import cn.hutool.core.io.file.FileReader;
-import cn.hutool.crypto.digest.DigestUtil;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @version 2022/11/16 15:23
- */
-public class IpLookupUtils {
- private static final Log logger = LogFactory.get();
- private static final String ipv4BuiltInName = "ip_v4_built_in.mmdb";
- private static final String ipv6BuiltInName = "ip_v6_built_in.mmdb";
- private static final String ipv4UserDefinedName = "ip_v4_user_defined.mmdb";
- private static final String ipv6UserDefinedName = "ip_v6_user_defined.mmdb";
- private static final String asnV4Name = "asn_v4.mmdb";
- private static final String asnV6Name = "asn_v6.mmdb";
- private static IpLookupV2 ipLookup;
-
- public IpLookupUtils() {
- ipLookup = initIpLookup();
- }
-
- /**
- * 从HDFS下载文件更新IpLookup
- *
- * @param knowledgeFileCache 广播流下发的元数据信息,包含更新内容
- * @return 更新后的IpLookup
- */
- public static void updateIpLookup(Map<String, String> knowledgeFileCache) {
- IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
- try {
- InputStream ipv4BuiltInFile =
- getUpdateFile(ipv4BuiltInName, knowledgeFileCache.get(ipv4BuiltInName));
- if (ipv4BuiltInFile != null) {
- builder.loadDataFileV4(ipv4BuiltInFile);
- }
-
- InputStream ipv6BuiltInFileFile =
- getUpdateFile(ipv6BuiltInName, knowledgeFileCache.get(ipv6BuiltInName));
- if (ipv6BuiltInFileFile != null) {
- builder.loadDataFileV6(ipv6BuiltInFileFile);
- }
-
- InputStream ipv4UserDefinedFile =
- getUpdateFile(ipv4UserDefinedName, knowledgeFileCache.get(ipv4UserDefinedName));
- if (ipv4UserDefinedFile != null) {
- builder.loadDataFilePrivateV4(ipv4UserDefinedFile);
- }
-
- InputStream ipv6UserDefinedFile =
- getUpdateFile(ipv6UserDefinedName, knowledgeFileCache.get(ipv6UserDefinedName));
- if (ipv6UserDefinedFile != null) {
- builder.loadDataFilePrivateV6(ipv6UserDefinedFile);
- }
-
- InputStream asnV4File = getUpdateFile(asnV4Name, knowledgeFileCache.get(asnV4Name));
- if (asnV4File != null) {
- builder.loadAsnDataFileV4(asnV4File);
- }
-
- InputStream asnV6File = getUpdateFile(asnV6Name, knowledgeFileCache.get(asnV6Name));
- if (asnV6File != null) {
- builder.loadAsnDataFileV6(asnV6File);
- }
- ipLookup = builder.build();
- } catch (Exception e) {
-
- e.printStackTrace();
- }
- }
-
- /**
- * 从HDFS下载文件初始化IpLookup
- *
- * @return 初始化的IpLookup
- */
- public static IpLookupV2 initIpLookup() {
- IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
- InputStream ipv4BuiltInFile = getInitFile(ipv4BuiltInName);
- if (ipv4BuiltInFile != null) {
- builder.loadDataFileV4(ipv4BuiltInFile);
- }
-
- InputStream ipv6BuiltInFileFile = getInitFile(ipv6BuiltInName);
- if (ipv6BuiltInFileFile != null) {
- builder.loadDataFileV6(ipv6BuiltInFileFile);
- }
-
- InputStream ipv4UserDefinedFile = getInitFile(ipv4UserDefinedName);
- if (ipv4UserDefinedFile != null) {
- builder.loadDataFilePrivateV4(ipv4UserDefinedFile);
- }
-
- InputStream ipv6UserDefinedFile = getInitFile(ipv6UserDefinedName);
- if (ipv6UserDefinedFile != null) {
- builder.loadDataFilePrivateV6(ipv6UserDefinedFile);
- }
-
- InputStream asnV4File = getInitFile(asnV4Name);
- if (asnV4File != null) {
- builder.loadAsnDataFileV4(asnV4File);
- }
-
- InputStream asnV6File = getInitFile(asnV6Name);
- if (asnV6File != null) {
- builder.loadAsnDataFileV6(asnV6File);
- }
- return builder.build();
- }
-
- /**
- * 下载更新文件并校验完整性
- *
- * @param fileName 文件名
- * @param sha256 文件校验码
- * @return true or false
- */
- private static InputStream getUpdateFile(String fileName, String sha256) {
- InputStream inputStream = null;
- try {
- if (StringUtil.isNotBlank(sha256)) {
- byte[] downloadFileBytes = downloadFile(fileName);
- if (downloadFileBytes != null) {
- String downloadFileSha256Hex = DigestUtil.sha256Hex(downloadFileBytes);
- if (sha256.equals(downloadFileSha256Hex)) {
- inputStream = new ByteArrayInputStream(downloadFileBytes);
- } else {
- logger.error("The downloaded file {} sha256 is inconsistent!", fileName);
- }
- }
- } else {
- byte[] downloadFileBytes = downloadFile(fileName);
- if (downloadFileBytes != null) {
- inputStream = new ByteArrayInputStream(downloadFileBytes);
- }
- }
- } catch (RuntimeException e) {
- logger.error(
- "Failed to download the {} file for update cache, Exception message:{}",
- fileName,
- e.getMessage());
- } finally {
- IOUtils.closeQuietly(inputStream);
- }
- return inputStream;
- }
-
- /**
- * 下载文件(初始化)
- *
- * @param fileName 文件名
- * @return 文件io流
- */
- private static InputStream getInitFile(String fileName) {
- InputStream inputStream = null;
- try {
- byte[] fileBytes = downloadFile(fileName);
- if (fileBytes != null) {
- inputStream = new ByteArrayInputStream(fileBytes);
- }
- } catch (RuntimeException e) {
- logger.error(
- "Failed to download the {} file for initialization cache, The exception message is:{}",
- fileName,
- e.getMessage());
- e.printStackTrace();
- } finally {
- IOUtils.closeQuietly(inputStream);
- }
- return inputStream;
- }
-
- /**
- * 根据不同文件系统获取定位库io流
- *
- * @param fileName 文件名
- * @return 文件io流
- */
- private static byte[] downloadFile(String fileName) {
- byte[] fileBytes = null;
- try {
- String filePath = CommonConfig.KNOWLEDGEBASE_FILE_STORAGE_PATH + fileName;
- fileBytes =
- "hdfs".equals(CommonConfig.KNOWLEDGEBASE_FILE_STORAGE_TYPE)
- ? HadoopUtils.downloadFileByBytes(filePath)
- : new FileReader(filePath).readBytes();
- } catch (RuntimeException e) {
- logger.error(
- "Failed to download file {} from {}, The exception message is:{}",
- CommonConfig.KNOWLEDGEBASE_FILE_STORAGE_TYPE,
- fileName,
- e.getMessage());
- e.printStackTrace();
- }
- return fileBytes;
- }
-
- public static String getGeoIpDetail(String ip) {
- String detail = "";
- try {
- detail = ipLookup.cityLookupDetail(ip);
- } catch (NullPointerException npe) {
- logger.error("The IP Location MMDB file is not loaded or IP is null! " + npe);
- } catch (RuntimeException e) {
- logger.error("Get clientIP location error! " + e.getMessage());
- }
- return detail;
- }
-
- public static String getGeoIpCountry(String ip) {
- String country = "";
- try {
- country = ipLookup.countryLookup(ip);
- } catch (NullPointerException npe) {
- logger.error("The IP Location MMDB file is not loaded or IP is null! " + npe);
- } catch (RuntimeException e) {
- logger.error("Get ServerIP location error! " + e.getMessage());
- }
- return country;
- }
-
- public static String getGeoAsn(String ip) {
- String asn = "";
- try {
- asn = ipLookup.asnLookup(ip);
- } catch (NullPointerException npe) {
- logger.error("The ASN MMDB file is not loaded or IP is null! " + npe);
- } catch (RuntimeException e) {
- logger.error("Get IP ASN error! " + e.getMessage());
- }
- return asn;
- }
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java
new file mode 100644
index 0000000..00b2b53
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java
@@ -0,0 +1,78 @@
+package com.geedgenetworks.core.utils.KnowlegdeBase;
+
+import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.utils.HttpClientUtils;
+import com.geedgenetworks.core.pojo.KnowLedgeEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public abstract class AbstractKnowledgeBase {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractKnowledgeBase.class);
+
+ protected AbstractKnowledgeBase() {
+ // 抽象类的构造函数
+ }
+
+ abstract Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) ;
+
+ abstract String functionName();
+
+
+ static String calculateSHA256(byte[] data) {
+ MessageDigest digest = null;
+ StringBuilder result = new StringBuilder();
+ try {
+ digest = MessageDigest.getInstance("SHA-256");
+ for (byte b : digest.digest(data)) {
+ result.append(String.format("%02x", b));
+ }
+ } catch (NoSuchAlgorithmException e) {
+ logger.error("calculateSHA256 error: " + e.getMessage());
+ }
+ return result.toString();
+ }
+
+
+ static byte[] downloadFile(String fileUrl, int isValid) {
+ byte[] inputStream = null;
+ if (isValid == 1) {
+ try {
+ long start = System.currentTimeMillis();
+ HttpClientUtils httpClientUtils = new HttpClientUtils();
+ inputStream = httpClientUtils.httpGetByte(fileUrl, 3000);
+ long end = System.currentTimeMillis();
+ logger.info(
+ "download file " + fileUrl + " finished, speed " + (end - start) + " ms");
+ } catch (Exception e) {
+ logger.error("download file " + fileUrl + " error: " + e.getMessage());
+ }
+ }
+ return inputStream;
+ }
+
+ static KnowLedgeEntity getMetadata(String id) {
+
+ try {
+// HttpClientUtils httpClientUtils = new HttpClientUtils();
+ // String metadate = httpClientUtils.httpGet(URI.create("/v1/knowledge_base/"+ id +"/meta")) ;
+//临时写死
+ KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity();
+ knowLedgeEntity.setIsValid(1);
+ knowLedgeEntity.setSha256("c06db87e9a914a8296ca892880c353884ac15b831c485c29fad9889ffb4e0fa8");
+ knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/4bd16ddb-d1cb-4310-b620-02164335c50d-aXBfYnVpbHRpbg==.mmdb");
+ /* knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c");
+ knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb");*/
+ // List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {});
+ // return knowLedgeEntityList.get(0);
+ return knowLedgeEntity;
+ } catch (Exception e) {
+ logger.error("get file Metadata " + id + " error: " + e.getMessage());
+ }
+
+ return null;
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java
new file mode 100644
index 0000000..a62acf3
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java
@@ -0,0 +1,106 @@
+package com.geedgenetworks.core.utils.KnowlegdeBase;
+
+
+import com.geedgenetworks.common.config.KnowledgeConfig;
+
+import com.geedgenetworks.core.pojo.KnowLedgeEntity;
+import com.geedgenetworks.utils.IpLookupV2;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class AsnKnowledgeBase extends AbstractKnowledgeBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsnKnowledgeBase.class);
+ @Getter
+ private static Map<String, IpLookupV2> venderWithAsnLookup = new ConcurrentHashMap<>();
+ private static final Logger logger = LoggerFactory.getLogger(AsnKnowledgeBase.class);
+ private static AsnKnowledgeBase instance;
+ // 私有构造函数,防止外部实例化
+
+ public static synchronized AsnKnowledgeBase getInstance() {
+ if (instance == null) {
+ instance = new AsnKnowledgeBase();
+ }
+ return instance;
+ }
+
+
+
+ public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) {
+
+ IpLookupV2.Builder asnLookupBuilder = new IpLookupV2.Builder(false);
+
+ for (int i = 0; i < knowledgeConfig.getFiles().size(); i++) {
+ try {
+ KnowLedgeEntity knowLedgeEntity = getMetadata(knowledgeConfig.getFiles().get(i));
+ byte[] fileByte = downloadFile(knowLedgeEntity.getPath(), knowLedgeEntity.getIsValid());
+ if (calculateSHA256(fileByte).equals(knowLedgeEntity.getSha256())) {
+ switch (i) {
+ case 0:
+ asnLookupBuilder.loadAsnDataFile(new ByteArrayInputStream(fileByte));
+ break;
+ case 1:
+ asnLookupBuilder.loadDataFilePrivate(new ByteArrayInputStream(fileByte));
+ break;
+ }
+ } else {
+ logger.error("check file sha256 error ");
+ return false;
+ }
+ } catch (Exception e) {
+ logger.error("updateKnowledgeBase error " + e.getMessage());
+ return false;
+ }
+ }
+ IpLookupV2 ipLookup = asnLookupBuilder.build();
+ venderWithAsnLookup.put(knowledgeConfig.getName(), ipLookup);
+ return true;
+ }
+ @Override
+ public String functionName() {
+ return "asnlookup";
+ }
+
+
+
+
+
+
+
+
+
+ private static void test(String ip) {
+
+ IpLookupV2 ipLookup = venderWithAsnLookup.get("TSG");
+ System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip));
+ System.out.println("cityLookup:" + ipLookup.cityLookup(ip));
+ System.out.println("countryLookup:" + ipLookup.countryLookup(ip));
+ System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip));
+ System.out.println("provinceLookup:" + ipLookup.provinceLookup(ip));
+ System.out.println("----------------------------");
+
+ long start1 = System.currentTimeMillis();
+ System.out.println(
+ "administrativeAreaLookupDetail:" + ipLookup.administrativeAreaLookupDetail(ip));
+ System.out.println("asnLookupDetail:" + ipLookup.asnLookupDetail(ip));
+ System.out.println("cityLookupDetail:" + ipLookup.cityLookupDetail(ip));
+ System.out.println("locationLookupDetail:" + ipLookup.locationLookupDetail(ip));
+ System.out.println("ispLookup:" + ipLookup.ispLookup(ip));
+ System.out.println("cityLatLngLookup:" + ipLookup.cityLatLngLookup(ip));
+ long end1 = System.currentTimeMillis();
+ System.out.println(end1 - start1);
+
+ System.out.println("--------------------------------------");
+ System.out.println("infoLookupToCSV:" + ipLookup.infoLookupToCSV(ip));
+ System.out.println("infoLookupToJson:" + ipLookup.ispLookup(ip));
+ System.out.println("----------------------------");
+ }
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/IpKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/IpKnowledgeBase.java
new file mode 100644
index 0000000..2dbba13
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/IpKnowledgeBase.java
@@ -0,0 +1,64 @@
+package com.geedgenetworks.core.utils.KnowlegdeBase;
+
+
+import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.core.pojo.KnowLedgeEntity;
+import com.geedgenetworks.utils.IpLookupV2;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.*;
+
+
+public class IpKnowledgeBase extends AbstractKnowledgeBase {
+
+ @Getter
+ private static Map<String, IpLookupV2> venderWithIpLookup = new HashMap<>();
+ private static final Logger logger = LoggerFactory.getLogger(IpKnowledgeBase.class);
+ private static IpKnowledgeBase instance;
+
+ public static synchronized IpKnowledgeBase getInstance() {
+ if (instance == null) {
+ instance = new IpKnowledgeBase();
+ }
+ return instance;
+ }
+ @Override
+ public Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) {
+ IpLookupV2.Builder ipLookupBuilder = new IpLookupV2.Builder(false);
+
+ for (int i = 0; i < knowledgeConfig.getFiles().size(); i++) {
+ try {
+ KnowLedgeEntity knowLedgeEntity = getMetadata(knowledgeConfig.getFiles().get(i));
+
+
+ byte[] fileByte = downloadFile(knowLedgeEntity.getPath(), knowLedgeEntity.getIsValid());
+ if (calculateSHA256(fileByte).equals(knowLedgeEntity.getSha256())) {
+ switch (i) {
+ case 0:
+ ipLookupBuilder.loadDataFile(new ByteArrayInputStream(fileByte));
+ break;
+ case 1:
+ ipLookupBuilder.loadDataFilePrivate(new ByteArrayInputStream(fileByte));
+ break;
+ }
+ } else {
+ logger.error("check file sha256 error ");
+ return false;
+ }
+ } catch (Exception e) {
+ logger.error("updateKnowledgeBase error " + e.getMessage());
+ return false;
+ }
+ }
+ IpLookupV2 ipLookup = ipLookupBuilder.build();
+ venderWithIpLookup.put(knowledgeConfig.getName(), ipLookup);
+ return true; }
+
+ @Override
+ public String functionName() {
+ return "geoiplookup";
+ }
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowledgeBaseSchedule.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java
index e7c3963..ccbf1a5 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowledgeBaseSchedule.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java
@@ -1,27 +1,24 @@
-package com.geedgenetworks.core.utils;
+package com.geedgenetworks.core.utils.KnowlegdeBase;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.TypeReference;
import com.geedgenetworks.common.config.CommonConfig;
import com.geedgenetworks.common.config.KnowledgeConfig;
-import com.geedgenetworks.common.utils.HttpClientUtils;
import com.geedgenetworks.core.pojo.KnowLedgeEntity;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URI;
+import java.lang.reflect.Method;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import static com.geedgenetworks.core.utils.ScheduTask.startTask;
public class KnowledgeBaseSchedule {
+ private static Timer timer;
+
public static Map<String, KnowledgeConfig> knowledgeConfigMap = new ConcurrentHashMap<>();
public static Map<String, List<KnowLedgeEntity>> KnowLedgeEntityMap = new ConcurrentHashMap<>();
@Getter
@@ -30,51 +27,57 @@ public class KnowledgeBaseSchedule {
private static KnowledgeBaseSchedule instance;
- public static synchronized KnowledgeBaseSchedule getInstance() throws Exception {
+ public static synchronized KnowledgeBaseSchedule getInstance() {
if (instance == null) {
instance = new KnowledgeBaseSchedule();
knowledgeBaseClassReflect =CommonConfig.KNOWLEDGE_BASE_CLASS_REFLECT;
- startTask();
+ ScheduTask.startTask();
}
return instance;
}
- public static void updateKnowledgeBase(String name) throws Exception {
+ public static void updateKnowledgeBase(String name) {
try {
// 获取私有构造方法
Class<?> cls = Class.forName(knowledgeBaseClassReflect.get(knowledgeConfigMap.get(name).getType()));
Constructor<?> constructor = cls.getDeclaredConstructor();
constructor.setAccessible(true);
- knowledgeUtil instance = (knowledgeUtil) constructor.newInstance();
+ // knowledgeUtil instance = (knowledgeUtil) constructor.newInstance();
+ Method createInstanceMethod = cls.getMethod("getInstance");
+ AbstractKnowledgeBase instance = (AbstractKnowledgeBase) createInstanceMethod.invoke(null);
// 使用实例调用方法
instance.updateKnowledgeBase(knowledgeConfigMap.get(name));
- } catch (NoSuchMethodException | InstantiationException |
- IllegalAccessException | InvocationTargetException e) {
+ } catch (Exception e) {
logger.error(e.getMessage());
}
}
- public static synchronized void initKnowledgeBase(KnowledgeConfig KnowledgeConfig) {
+ public static synchronized void initKnowledgeBase(KnowledgeConfig KnowledgeConfig) {
- if (!knowledgeConfigMap.containsKey(KnowledgeConfig.getName())) {
- List<KnowLedgeEntity> knowLedgeEntityList = new ArrayList<>();
- try {
- knowledgeConfigMap.put(KnowledgeConfig.getName(), KnowledgeConfig);
- updateKnowledgeBase(KnowledgeConfig.getName());
- for (String id : KnowledgeConfig.getFiles()) {
- KnowLedgeEntity knowLedgeEntity = getMetadata(id);
- knowLedgeEntityList.add(knowLedgeEntity);
+ if (instance == null) {
+ getInstance();
+ }
+ if (!knowledgeConfigMap.containsKey(KnowledgeConfig.getName())) {
+ List<KnowLedgeEntity> knowLedgeEntityList = new ArrayList<>();
+ try {
+ knowledgeConfigMap.put(KnowledgeConfig.getName(), KnowledgeConfig);
+ updateKnowledgeBase(KnowledgeConfig.getName());
+ for (String id : KnowledgeConfig.getFiles()) {
+ KnowLedgeEntity knowLedgeEntity = getMetadata(id);
+ knowLedgeEntityList.add(knowLedgeEntity);
+ }
+ KnowLedgeEntityMap.put(KnowledgeConfig.getName(), knowLedgeEntityList);
+ } catch (Exception e) {
+ logger.error("initKnowledgeBase " + KnowledgeConfig.getName() + " error: " + e.getMessage());
}
KnowLedgeEntityMap.put(KnowledgeConfig.getName(), knowLedgeEntityList);
- } catch (Exception e) {
- logger.error("initKnowledgeBase " + KnowledgeConfig.getName() + " error: " + e.getMessage());
}
- KnowLedgeEntityMap.put(KnowledgeConfig.getName(), knowLedgeEntityList);
- }
+
+
}
private static KnowLedgeEntity getMetadata(String id) {
@@ -124,8 +127,37 @@ public class KnowledgeBaseSchedule {
+ static class ScheduTask extends TimerTask {
+ @Override
+ public void run() {
+ for (Map.Entry<String, KnowledgeConfig> entry : knowledgeConfigMap.entrySet()) {
+ Boolean flag = null;
+ try {
+ flag = checkUpdateMesssage(entry.getValue().getName());
+ if(flag){
+ updateKnowledgeBase(entry.getKey());
+ }
+ } catch (NoSuchAlgorithmException e) {
+ logger.error(e.getMessage());
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+ }
+
+ public static void startTask() {
+ timer = new Timer();
+ timer.schedule(new ScheduTask(), 0, 300000); // 每隔300秒执行一次任务
+ }
+ public static void stopTask() {
+ if (timer != null) {
+ timer.cancel();
+ timer = null;
+ }
+ }
+ }
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/ScheduTask.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/ScheduTask.java
deleted file mode 100644
index 78e43d7..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/ScheduTask.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.geedgenetworks.core.utils;
-
-import com.geedgenetworks.common.config.KnowledgeConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.security.NoSuchAlgorithmException;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import static com.geedgenetworks.core.utils.KnowledgeBaseSchedule.*;
-
-public class ScheduTask extends TimerTask {
-
- private static Timer timer;
- private static final Logger logger = LoggerFactory.getLogger(ScheduTask.class);
-
- @Override
- public void run() {
- for (Map.Entry<String, KnowledgeConfig> entry : knowledgeConfigMap.entrySet()) {
- Boolean flag = null;
- try {
- flag = checkUpdateMesssage(entry.getValue().getName());
- if(flag){
- updateKnowledgeBase(entry.getKey());
- }
- } catch (NoSuchAlgorithmException e) {
- logger.error(e.getMessage());
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
- //System.out.println("Static task is running...");
- }
-
- public static void startTask() {
- timer = new Timer();
- timer.schedule(new ScheduTask(), 0, 300000); // 每隔300秒执行一次任务
- }
-
- public static void stopTask() {
- if (timer != null) {
- timer.cancel();
- timer = null;
- }
- }
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtil.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtil.java
deleted file mode 100644
index 9c9aef4..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtil.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.geedgenetworks.core.utils;
-
-import com.geedgenetworks.common.config.KnowledgeConfig;
-
-public abstract class knowledgeUtil {
-
-
- abstract Boolean updateKnowledgeBase(KnowledgeConfig knowledgeConfig) throws Exception;
-
- abstract String functionName();
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtils.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtils.java
deleted file mode 100644
index 24dd632..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/knowledgeUtils.java
+++ /dev/null
@@ -1,200 +0,0 @@
-package com.geedgenetworks.core.utils;
-
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.listener.Listener;
-import com.alibaba.nacos.api.exception.NacosException;
-import com.geedgenetworks.common.config.CommonConfig;
-import com.geedgenetworks.common.utils.HttpClientUtils;
-import com.geedgenetworks.core.pojo.KnowLedgeEntity;
-import com.geedgenetworks.utils.StringUtil;
-import com.jayway.jsonpath.JsonPath;
-import org.apache.http.Header;
-import org.apache.http.message.BasicHeader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.Executor;
-
-public class knowledgeUtils {
-
- private static final String KNOWLEDGE_EXPR;
- private static Map<String, String> knowLedgeFileSha256 = new HashMap<>();
- private static final Logger logger = LoggerFactory.getLogger(knowledgeUtils.class);
-
- static {
- StringBuilder sb = new StringBuilder("");
- sb.append(
- "$.[?(@.version=='latest' && @.name in ['ip_v4_built_in','ip_v6_built_in','ip_v4_user_defined','ip_v6_user_defined','asn_v4','asn_v6'] && @.id in ['");
- // sb.append(commonConfig.ID_ARRAY);
- /* sb.append(commonConfig.IPV4_USER_DEFINED_ID);
- sb.append("','");
- sb.append(commonConfig.IPV6_USER_DEFINED_ID);
- sb.append("','");
- sb.append(commonConfig.IPV4_BUILT_IN_ID);
- sb.append("','");
- sb.append(commonConfig.IPV6_BUILT_IN_ID);
- sb.append("','");
- sb.append(commonConfig.ASN_V4_ID);
- sb.append("','");
- sb.append(commonConfig.ASN_V6_ID);*/
- sb.append("'])].['id','name','sha256','format','path','type','isValid']");
- KNOWLEDGE_EXPR = sb.toString();
- System.out.println("listening nacos config");
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.SERVER_ADDR, CommonConfig.NACOS_SERVER_ADDR);
- properties.setProperty(PropertyKeyConst.USERNAME, CommonConfig.NACOS_USERNAME);
- properties.setProperty(PropertyKeyConst.PASSWORD, CommonConfig.NACOS_PASSWORD);
- if (!StringUtil.isEmpty(CommonConfig.NACOS_NAMESPACE)) {
- properties.setProperty(PropertyKeyConst.NAMESPACE, CommonConfig.NACOS_NAMESPACE);
- }
- Properties nacosProperties = properties;
- String dataId = CommonConfig.NACOS_DATA_ID;
- String group = CommonConfig.NACOS_GROUP;
- long readTimeout = CommonConfig.NACOS_READ_TIMEOUT;
- // 初始化定位库缓存
- logger.info("connect nacos:" + nacosProperties.getProperty(PropertyKeyConst.SERVER_ADDR));
- try {
- ConfigService configService = NacosFactory.createConfigService(nacosProperties);
- String nacosInfo = configService.getConfig(dataId, group, readTimeout);
- processNacosInfo(nacosInfo);
-
- configService.addListener(
- dataId,
- group,
- new Listener() {
- @Override
- public Executor getExecutor() {
- return null;
- }
-
- @Override
- public void receiveConfigInfo(String nacosInfo) {
- processNacosInfo(nacosInfo);
- }
- });
- } catch (NacosException e) {
- e.printStackTrace();
- }
- }
-
- public static void initKnowledge() {}
-
- private static void processNacosInfo(String nacosInfo) {
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String currentDate = formatter.format(new Date());
- System.out.println(currentDate + ": receive nacos config:" + nacosInfo);
- ArrayList<Map<String, Object>> metaList = JsonPath.parse(nacosInfo).read(KNOWLEDGE_EXPR);
- System.out.println(
- "resolve " + metaList.size() + " metadata from the config info pushed by nacos");
- loadKnowledge(metaList);
- }
-
- private static void loadKnowledge(ArrayList<Map<String, Object>> metaList) {
- Map<String, KnowLedgeEntity> updateMap = new HashMap<>();
- /*
- List<String> tagTypes = new ArrayList<>(3);
- tagTypes.add("cn_ip_tag_user_defined");
- tagTypes.add("cn_domain_tag_user_defined");
- tagTypes.add("cn_app_tag_user_defined");*/
-
- for (Map<String, Object> metadata : metaList) {
-
- String sha256 = (String) metadata.get("sha256");
- String fileUrl = (String) metadata.get("path");
- String id = (String) metadata.get("id");
- int isValid = (Integer) metadata.get("isValid");
-
- if (isValid != 1) {
- sha256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
- }
-
- /* try {
- if (!knowLedgeFileSha256.containsKey(id)) {
- byte[] downloadBytes = downloadFile(fileUrl, sha256, isValid);
- KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity();
- knowLedgeEntity.setDownloadBytes(downloadBytes);
- knowLedgeEntity.setIsValid(isValid);
- knowLedgeEntity.setSha256(sha256);
- knowLedgeFileSha256.put(id, sha256);
- updateMap.put(id, knowLedgeEntity);
-
- } else {
- if (!knowLedgeFileSha256.get(id).equals(sha256)) {
- byte[] downloadBytes = downloadFile(fileUrl, sha256, isValid);
- KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity();
- knowLedgeEntity.setDownloadBytes(downloadBytes);
- knowLedgeEntity.setIsValid(isValid);
- knowLedgeEntity.setSha256(sha256);
- knowLedgeFileSha256.put(id, sha256);
- updateMap.put(id, knowLedgeEntity);
-
- }
-
-
- }
-
-
-
- } catch (Exception e) {
- System.out.println("process file " + id + " error, error message:" + e.getMessage());
- e.printStackTrace();
- }*/
- }
- if (updateMap.size() > 0) {
-
- // IPUtils.loadIpLook(updateMap);
- // csvUtils.loadIpLook(updateMap);
- }
- }
-
- private static InputStream downloadFile(String fileUrl) {
- InputStream inputStream = null;
- byte[] content = new byte[0];
- try {
- long start = System.currentTimeMillis();
- Header header = new BasicHeader("token", CommonConfig.HOS_TOKEN);
- HttpClientUtils httpClientUtils = new HttpClientUtils();
- inputStream = httpClientUtils.httpGetInputStream(fileUrl, 3000, header);
- // content = IOUtils.toByteArray(inputStream);
- long end = System.currentTimeMillis();
- System.out.println(
- "download file " + fileUrl + " finished, speed " + (end - start) + " ms");
- } catch (Exception e) {
- System.out.println("download file " + fileUrl + " error: " + e.getMessage());
- } finally {
- // IOUtils.closeQuietly(inputStream);
- }
- return inputStream;
- }
-
- /* private static byte[] downloadFile(String fileUrl, String sha256, int isValid) {
- byte[] content = new byte[0];
- if (isValid != 1) {
- return content;
- }
- String downloadFileSha256;
- int downloadCount = 0;
- do {
- content = downloadFile(fileUrl);
- // downloadFileSha256 = DigestUtil.sha256Hex(content);
- downloadCount++;
- } while (!downloadFileSha256.equals(sha256) && downloadCount < commonConfig.KNOWLEDGE_FILE_CHECK_NUMBER);
- if (downloadCount >= commonConfig.KNOWLEDGE_FILE_CHECK_NUMBER) {
- System.out.println("warning: file url: " + fileUrl + " download more than specified number of times");
- }
- return content;
- }*/
-
- public static void main(String[] args) throws InterruptedException {
- System.out.println(knowLedgeFileSha256);
- System.out.println(knowLedgeFileSha256);
- while (true) {
- Thread.sleep(10000);
- }
- }
-}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
index be1e892..a4b00c1 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/AsnLookupFunctionTest.java
@@ -4,7 +4,7 @@ import com.geedgenetworks.common.config.KnowledgeConfig;
import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.udf.AsnLookup;
-import com.geedgenetworks.core.utils.ASNKnowledgeBase;
+import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -31,6 +31,8 @@ public class AsnLookupFunctionTest {
udfContext.setOutput_fields(Arrays.asList("asn"));
}
+
+
@Test
public void testInit(){
AsnLookup asnLookup = new AsnLookup();
@@ -68,13 +70,13 @@ public class AsnLookupFunctionTest {
knowledgeConfig.setName("tsg_asnlookup");
knowledgeConfig.setType("asnlookup");
knowledgeConfig.setFiles(Arrays.asList("7ce2f9890950ba90-fcc25696bf11a8a0","7ce2f9890950ba90-71f13b3736863ddb"));
- ASNKnowledgeBase asnKnowledgeBase =ASNKnowledgeBase.getInstance();
+ AsnKnowledgeBase asnKnowledgeBase = AsnKnowledgeBase.getInstance();
asnKnowledgeBase.updateKnowledgeBase(knowledgeConfig);
- String asn = ASNKnowledgeBase.getVenderWithAsnLookup()
+ String asn = AsnKnowledgeBase.getVenderWithAsnLookup()
.get("tsg_asnlookup")
.asnLookup("2600:1015:b002::");
- assertEquals("6167", asn);
+ // assertEquals("6167", asn);
}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
new file mode 100644
index 0000000..ab23331
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
@@ -0,0 +1,105 @@
+package com.geedgenetworks.core.udf.test;
+
+import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
+import com.geedgenetworks.core.pojo.UDFContext;
+import com.geedgenetworks.core.udf.GeoIpLookup;
+import com.geedgenetworks.core.utils.KnowlegdeBase.IpKnowledgeBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class GeoIpLookupFunctionTest {
+
+ private static UDFContext udfContext;
+
+ private static Map<String, Object> parameters ;
+
+ @BeforeAll
+ public static void setUp() {
+ udfContext = new UDFContext();
+ parameters = new HashMap<>();
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Arrays.asList("ip"));
+ udfContext.setOutput_fields(Arrays.asList("iplocation"));
+ }
+
+//com.geedgenetworks.core.udf.GeoIpLookup单元测试
+ @Test
+ public void testInit(){
+ GeoIpLookup geoIpLookup = new GeoIpLookup();
+ udfContext.setLookup_fields(new ArrayList<>());
+ udfContext.setParameters(new HashMap<>());
+ udfContext.setParameters(null);
+ Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
+ geoIpLookup.open(null, udfContext);
+ });
+ udfContext.setParameters(new HashMap<>());
+ udfContext.getParameters().put("option","IP_TO_CITY");
+ Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
+ geoIpLookup.open(null, udfContext);
+ });
+
+ udfContext.setLookup_fields(new ArrayList<>());
+ udfContext.getLookup_fields().add("v1");
+ udfContext.setOutput_fields(new ArrayList<>());
+ udfContext.getOutput_fields().add("v2");
+ udfContext.setParameters(new HashMap<>());
+ udfContext.getParameters().put("option","other");
+ Assertions.assertThrows(GrootStreamRuntimeException.class, () -> {
+ geoIpLookup.open(null, udfContext);
+ });
+
+ }
+
+
+
+ @Test
+ public void testAsnLookupFunctionIpToAsn() throws Exception {
+
+ /* udfContext.setParameters(new HashMap<>());
+ udfContext.getParameters().put("option","IP_TO_ASN");
+ udfContext.getParameters().put("vendor_id","tsg_asnlookup");*/
+
+ KnowledgeConfig knowledgeConfig =new KnowledgeConfig();
+ knowledgeConfig.setName("tsg_geoiplookup");
+ knowledgeConfig.setType("geoiplookup");
+ knowledgeConfig.setFiles(Arrays.asList("acf1db8589c5e277-ead1a65e1c3973dc","acf1db8589c5e277-ead1a65e1c3973dc"));
+ IpKnowledgeBase ipKnowledgeBase = IpKnowledgeBase.getInstance();
+ ipKnowledgeBase.updateKnowledgeBase(knowledgeConfig);
+ String countryLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").countryLookup("2600:1015:b002::");
+ String provinceLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").provinceLookup("2600:1015:b002::");
+ String cityLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").cityLookup("2600:1015:b002::");
+ String cityLookupDetail = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").cityLookupDetail("2600:1015:b002::");
+ String locationLookupDetail = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").locationLookupDetail("2600:1015:b002::");
+ String latLngLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").latLngLookup("2600:1015:b002::");
+ String ispLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").ispLookup("2600:1015:b002::");
+ String infoLookupToJSONString = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookupToJSONString("2600:1015:b002::");
+ //String infoLookup = (String) IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookup("2600:1015:b002::");
+
+
+ System.out.println("countryLookup:" + countryLookup);
+ System.out.println("provinceLookup:" + provinceLookup);
+ System.out.println("cityLookup:" + cityLookup);
+ System.out.println("cityLookupDetail:" + cityLookupDetail);
+ System.out.println("locationLookupDetail:" + locationLookupDetail);
+ System.out.println("latLngLookup:" + latLngLookup);
+ System.out.println("ispLookup:" + ispLookup);
+ System.out.println("infoLookupToJSONString:" + infoLookupToJSONString);
+ System.out.println("infoLookup:" + IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookup("2600:1015:b002::"));
+ System.out.println("----------------------------");
+
+
+ // assertEquals("6167", asn);
+
+ }
+
+
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java
new file mode 100644
index 0000000..4edbba1
--- /dev/null
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/FromUnixTimestampConverterTest.java
@@ -0,0 +1,99 @@
+package com.geedgenetworks.core.udf.test.simple;
+
+import com.geedgenetworks.core.pojo.Event;
+import com.geedgenetworks.core.pojo.UDFContext;
+import com.geedgenetworks.core.udf.FromUnixTimestamp;
+import com.geedgenetworks.core.udf.UnixTimestampConverter;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class FromUnixTimestampConverterTest {
+
+ private static UDFContext udfContext;
+
+ @BeforeAll
+ public static void setUp() {
+ udfContext = new UDFContext();
+ udfContext.setLookup_fields(Arrays.asList("input"));
+ udfContext.setOutput_fields(Arrays.asList("output"));
+ }
+ @Test
+ public void testFromUnixTimestampFunctionMstoS() throws Exception {
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("precision", "seconds");
+ udfContext.setParameters(parameters);
+ UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter();
+ unixTimestampConverter.open(null, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("input", 1577808000000L);
+ event.setExtractedFields(extractedFields);
+ Event result1 = unixTimestampConverter.evaluate(event);
+ assertEquals(1577808000L, result1.getExtractedFields().get("output"));
+
+
+ }
+
+ @Test
+ public void testFromUnixTimestampFunctionStoMs() throws Exception {
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("precision", "milliseconds");
+ udfContext.setParameters(parameters);
+ UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter();
+ unixTimestampConverter.open(null, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("input", 1577808000L);
+ event.setExtractedFields(extractedFields);
+ Event result1 = unixTimestampConverter.evaluate(event);
+ assertEquals(1577808000000L, result1.getExtractedFields().get("output"));
+
+
+ }
+
+
+ @Test
+ public void testFromUnixTimestampFunctionStoS() throws Exception {
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("precision", "seconds");
+ udfContext.setParameters(parameters);
+ UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter();
+ unixTimestampConverter.open(null, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("input", 1577808000L);
+ event.setExtractedFields(extractedFields);
+ Event result1 = unixTimestampConverter.evaluate(event);
+ assertEquals(1577808000L, result1.getExtractedFields().get("output"));
+
+
+ }
+
+
+ @Test
+ public void testFromUnixTimestampFunctionMstoMs() throws Exception {
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("precision", "milliseconds");
+ udfContext.setParameters(parameters);
+ UnixTimestampConverter unixTimestampConverter = new UnixTimestampConverter();
+ unixTimestampConverter.open(null, udfContext);
+ Event event = new Event();
+ Map<String, Object> extractedFields = new HashMap<>();
+ extractedFields.put("input", 1577808000000L);
+ event.setExtractedFields(extractedFields);
+ Event result1 = unixTimestampConverter.evaluate(event);
+ assertEquals(1577808000000L, result1.getExtractedFields().get("output"));
+
+
+ }
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/ReNameFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/ReNameFunctionTest.java
index e2e6d66..d55d2a6 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/ReNameFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/simple/ReNameFunctionTest.java
@@ -3,9 +3,6 @@ package com.geedgenetworks.core.udf.test.simple;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.udf.Rename;
-import com.geedgenetworks.core.utils.ASNKnowledgeBase;
-import com.geedgenetworks.core.utils.KnowledgeBaseSchedule;
-import com.geedgenetworks.utils.IpLookupV2;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;