summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangkuan <[email protected]>2023-12-12 19:20:58 +0800
committerwangkuan <[email protected]>2023-12-12 19:20:58 +0800
commit49ed485e1b8817ca838613f2c548c75f086627cb (patch)
tree4ff515cf5efe4136aa6fa5c61c65ac433ac20ff9
parent0452c73c0acbf1e4ac45e46f94350f0313a8d7a0 (diff)
[improve][core][common] 增加定时任务工具类,部分udf优化,tsg任务所有功能配置,grootstream全局参数修改(暂时只修改配置文件,没有去掉engine待讨论)
-rw-r--r--config/grootstream.yaml57
-rw-r--r--config/grootstream_job_template.yaml102
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java2
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java6
-rw-r--r--groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java26
-rw-r--r--groot-common/src/main/resources/groot-platform-plugin4
-rw-r--r--groot-common/src/main/resources/grootstream.yaml74
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java18
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java12
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java26
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java7
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java8
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java129
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java27
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java27
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java2
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java163
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java191
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java5
-rw-r--r--pom.xml9
21 files changed, 537 insertions, 360 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml
index a9e2791..22be603 100644
--- a/config/grootstream.yaml
+++ b/config/grootstream.yaml
@@ -1,39 +1,44 @@
grootstream:
- engine:
- http_con_pool:
- max_total: 400
- max_per_route: 60
- connection_request_timeout: 60000
- connect_timeout: 60000
- socket_timeout: 60000
- knowledge_base:
- - name: TSG_asnlookup
- type: asnlookup
- properties:
- fs_type: hos
- fs_default_path: http://path
- files:
- default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0
- user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb
- - name: CN_asnlookup
- type: asnlookup
- files:
- default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0
- user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb
nacos:
server_addr: 192.168.44.12:8848
username: nacos
password: nacos
- namespace: test
- data_id: knowledge_base.json
- group: DEFAULT_GROUP
+
+
read_timeout: 5000
consul:
server_addr: 192.168.41.30
server_port: 8500
token: c989b503-1d74-f49c-5698-e90fe461e938
- hdfs:
- servers: 192.168.44.11:9000,192.168.44.14:9000
+ knowledge_base:
+ - name: TSG_asnlookup
+ type: asnlookup
+ properties:
+ fs_type: hos
+ fs_default_path: http://path
+ files:
+ default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0
+ user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb
+ - name: CN_asnlookup
+ type: asnlookup
+ files:
+ default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0
+ user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb
+ properties:
+ hos.path: //path
+ hos.traffic.file.bucket: bucket
+#grootstream.properties.hos_path
+
+
+
+
+
+
+
+
+
+
+
diff --git a/config/grootstream_job_template.yaml b/config/grootstream_job_template.yaml
index c125ebb..ad96a07 100644
--- a/config/grootstream_job_template.yaml
+++ b/config/grootstream_job_template.yaml
@@ -6,8 +6,8 @@ sources:
# watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms
# watermark_lag: 60 # [number] Watermark Lag, default is 60
properties: # [object] Source Properties
- topic: SESSION-RECORD-COMPLETED
- kafka.bootstrap.servers: 192.168.44.11:9092
+ topic: SESSION-RECORD
+ kafka.bootstrap.servers: 192.168.44.12:9092
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
@@ -95,59 +95,99 @@ processing_pipelines:
functions: # [array of object] Function List
- function: ASN_LOOKUP
- lookup_fields: [server_ip]
- output_fields: [server_asn]
+ lookup_fields: [ server_ip ]
+ output_fields: [ server_asn ]
+ parameters:
+ option: IP_TO_ASN
+ vendor_id: tsg_asnlookup
+
+ - function: ASN_LOOKUP
+ lookup_fields: [ client_ip ]
+ output_fields: [ client_asn ]
parameters:
option: IP_TO_ASN
- vendor_id : tsg_asnlookup
+ vendor_id: tsg_asnlookup
+
- function: SNOWFLAKE_ID
lookup_fields: [ '' ]
- output_fields: [ common_log_id ]
+ output_fields: [ log_id ]
filter:
- - function: EVAL
- output_fields: [ common_internal_ip ]
- parameters:
- value_expression: 'common_direction=69 ? common_client_ip : common_server_ip'
- - function: EVAL
- output_fields: [ common_external_ip ]
- parameters:
- value_expression: 'common_direction=73 ? common_client_ip : common_server_ip'
+
- function: JSON_EXTRACT
- lookup_fields: [ common_device_tag ]
- output_fields: [ common_data_center ]
+ lookup_fields: [ device_tag ]
+ output_fields: [ data_center ]
filter:
parameters:
param: $.tags[?(@.tag=='data_center')][0].value
+
- function: JSON_EXTRACT
- lookup_fields: [ common_device_tag ]
- output_fields: [ common_device_group ]
+ lookup_fields: [ device_tag ]
+ output_fields: [ device_group ]
filter:
parameters:
param: $.tags[?(@.tag=='device_group')][0].value
+
- function: UNIX_TIMESTAMP_FUNCTION
- output_fields: [ common_processing_time ]
+ output_fields: [ processing_time ]
parameters:
precision: seconds
+
+ - function: UNIX_TIMESTAMP_CONVERTER
+ lookup_fields: [ __timestamp ]
+ output_fields: [ recv_time ]
+ parameters:
+ precision: seconds
+
- function: EVAL
- output_fields: [ common_recv_time ]
+ output_fields: [ recv_time ]
parameters:
- value_expression: 'common_ingestion_time'
+ value_expression: 'ingestion_time'
+
- function: DOMAIN
- lookup_fields: [ http_host,ssl_sni,quic_sni ]
- output_fields: [ common_server_domain ]
+ lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ]
+ output_fields: [ server_domain ]
parameters:
option: FIRST_SIGNIFICANT_SUBDOMAIN
- - function: EVAL
- output_fields: [ http_domain ]
- parameters:
- value_expression: 'common_server_domain'
+
- function: BASE64_DECODE_TO_STRING
lookup_fields: [ mail_subject,mail_subject_charset ]
output_fields: [ mail_subject ]
+
- function: BASE64_DECODE_TO_STRING
lookup_fields: [ mail_attachment_name,mail_attachment_name_charset ]
output_fields: [ mail_attachment_name ]
+ - function: PATH_COMBINE
+ lookup_fields: [ packet_capture_file ]
+ output_fields: [ packet_capture_file ]
+ parameters:
+ path: [ global.hos_path, global.pcap_file_bucket_name, packet_capture_file ]
+
+ - function: PATH_COMBINE
+ lookup_fields: [ rtp_pcap_path ]
+ output_fields: [ rtp_pcap_path ]
+ parameters:
+ path: [ global.hos_path, global.pcap_file_bucket_name, rtp_pcap_path ]
+
+ - function: PATH_COMBINE
+ lookup_fields: [ http_request_body ]
+ output_fields: [ http_request_body ]
+ parameters:
+ path: [ global.hos_path, global.traffic_file_bucket, http_request_body ]
+
+ - function: PATH_COMBINE
+ lookup_fields: [ http_response_body ]
+ output_fields: [ http_response_body ]
+ parameters:
+ path: [ global.hos_path, global.traffic_file_bucket, http_response_body ]
+
+ - function: PATH_COMBINE
+ lookup_fields: [ mail_eml_file ]
+ output_fields: [ mail_eml_file ]
+ parameters:
+ path: [ global.hos_path, global.traffic_file_bucket, mail_eml_file ]
+
+
sinks:
kafka_sink_a:
type: kafka
@@ -195,11 +235,11 @@ application: # [object] Application Configuration
env: # [object] Environment Variables
execution.parallelism: 1 # [number] Job-Level Parallelism
topology: # [array of object] Node List. It will be used build data flow for job dag graph.
- - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
+ - name: kafka_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
parallelism: 1 # [number] Operator-Level Parallelism.
- downstream: [schema_type_filter] # [array of string] Downstream Node Name List.
- - name: schema_type_filter
- parallelism: 1
+ # downstream: [schema_type_filter] # [array of string] Downstream Node Name List.
+ #- name: schema_type_filter
+ # parallelism: 1
downstream: [session_record_processor]
- name: session_record_processor
parallelism: 1
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java
index 3591ce6..a286086 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java
@@ -4,6 +4,7 @@ import lombok.Data;
import java.io.Serializable;
import java.util.List;
+import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -20,6 +21,7 @@ public class EngineConfig implements Serializable {
private ConsulConfig consulConfig = ServerConfigOptions.CONSUL.defaultValue();
private ZookeeperConfig zookeeperConfig = ServerConfigOptions.ZOOKEEPER.defaultValue();
private HdfsConfig hdfsConfig = ServerConfigOptions.HDFS.defaultValue();
+ private Map<String,String> propertiesConfig = ServerConfigOptions.PROPERTIES.defaultValue();
public void setKnowledgeBaseConfig(List<KnowledgeConfig> knowledgeBaseConfig) {
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java
index 04bc7ec..2c03b21 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java
@@ -197,6 +197,10 @@ public class ServerConfigOptions {
.defaultValue(new HdfsConfig())
.withDescription("The hdfs configuration.");
-
+ public static final Option<Map<String, String>> PROPERTIES =
+ Options.key("properties")
+ .mapType()
+ .defaultValue(new HashMap<String,String>())
+ .withDescription("The properties of grootstream");
}
diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java
index ccdd58a..843863f 100644
--- a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java
+++ b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java
@@ -23,7 +23,7 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso
this.config = config;
}
- @Override
+/* @Override
public void buildConfig(Node rootNode) {
for (Node node : childElements(rootNode)) {
String nodeName = cleanNodeName(node);
@@ -38,18 +38,20 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso
occurrenceSet.add(nodeName);
}
}
- }
+ }*/
- private boolean handleNode(Node node, String name) {
- if (GrootStreamConfigSections.ENGINE.isEqual(name)) {
+/* private boolean handleNode(Node node, String name) {
+ if (GrootStreamConfigSections.GROOTSTREAM.isEqual(name)) {
parseEngineConfig(node, config);
} else {
return true;
}
+ parseEngineConfig(node, config);
+
return false;
- }
+ }*/
- private void parseEngineConfig(Node engineNode, GrootStreamConfig config) {
+ public void buildConfig(Node engineNode) {
final EngineConfig engineConfig = config.getEngineConfig();
for (Node node : childElements(engineNode)) {
String name = cleanNodeName(node);
@@ -65,6 +67,8 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso
engineConfig.setZookeeperConfig(parseZookeeperConfig(node));
} else if (ServerConfigOptions.HDFS.key().equals(name)) {
engineConfig.setHdfsConfig(parseHdfsConfig(node));
+ } else if (ServerConfigOptions.PROPERTIES.key().equals(name)) {
+ engineConfig.setPropertiesConfig(parsePropertiesConfig(node));
} else {
LOGGER.warning("Unrecognized configuration element: " + name);
@@ -73,6 +77,16 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso
}
}
+ private Map<String, String> parsePropertiesConfig(Node properties) {
+
+ Map<String, String> propertiesMap = new HashMap<>();
+ for (Node node : childElements(properties)) {
+ String name = cleanNodeName(node);
+ propertiesMap.put(name,getTextContent(node));
+ }
+ return propertiesMap;
+ }
+
private ZookeeperConfig parseZookeeperConfig(Node zookeeperNode) {
ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
diff --git a/groot-common/src/main/resources/groot-platform-plugin b/groot-common/src/main/resources/groot-platform-plugin
index bebf1e9..d22c057 100644
--- a/groot-common/src/main/resources/groot-platform-plugin
+++ b/groot-common/src/main/resources/groot-platform-plugin
@@ -6,4 +6,6 @@ com.geedgenetworks.core.udf.JsonExtract
com.geedgenetworks.core.udf.UnixTimestamp
com.geedgenetworks.core.udf.Domain
com.geedgenetworks.core.udf.DecodeBase64
-com.geedgenetworks.core.udf.GeoIpLookup \ No newline at end of file
+com.geedgenetworks.core.udf.GeoIpLookup
+com.geedgenetworks.core.udf.PathCombine
+com.geedgenetworks.core.udf.UnixTimestampConverter
diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml
index d0906bc..7ce881c 100644
--- a/groot-common/src/main/resources/grootstream.yaml
+++ b/groot-common/src/main/resources/grootstream.yaml
@@ -1,53 +1,25 @@
grootstream:
- engine:
- tick_tuple_freq_secs: 90
- gtpc_scan_max_rows: 10000
- radius_scan_max_rows: 10000
- radius_table_name: RADIUS-TABLE
- gtpc_table_name: GTPC-TABLE
- hbase_rpc_timeout: 60000
- http_con_pool:
- max_total: 400
- max_per_route: 60
- connection_request_timeout: 60000
- connect_timeout: 60000
- socket_timeout: 60000
- knowledge_base:
- - name: tsg_asnlookup
- type: asnlookup
- properties:
- fs_type: hos
- fs_default_path: http://path
- files:
- default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0
- user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb
- - name: tsg_geoiplookup
- type: geoiplookup
- files:
- default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0
- user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb
-
- snowflake:
- data_center_id_num: 1
- nacos:
- server_addr: 192.168.44.12:8848
- username: nacos
- password: nacos
- namespace: test
- data_id: knowledge_base.json
- group: DEFAULT_GROUP
- read_timeout: 5000
- consul:
- server_addr: 192.168.41.30
- server_port: 8500
- token: c989b503-1d74-f49c-5698-e90fe461e938
- hos:
- token: c21f969b5f03d33d43e04f8f136e7682
- zookeeper:
- quorum: 192.168.44.12:2181
- hdfs:
- servers: 192.168.44.11:9000,192.168.44.14:9000
-
-
-
+ http_con_pool:
+ max_total: 400
+ max_per_route: 60
+ connection_request_timeout: 60000
+ connect_timeout: 60000
+ socket_timeout: 60000
+ knowledge_base:
+ - name: tsg_asnlookup
+ type: asnlookup
+ properties:
+ fs_type: hos
+ fs_default_path: http://path
+ files:
+ 7ce2f9890950ba90-fcc25696bf11a8a0
+ 7ce2f9890950ba90-71f13b3736863ddb
+ - name: tsg_geoiplookup
+ type: geoiplookup
+ files:
+ 7ce2f9890950ba90-fcc25696bf11a8a0
+ 7ce2f9890950ba90-71f13b3736863ddb
+ properties:
+ hos.path: //path
+ hos.traffic.file.bucket: bucket \ No newline at end of file
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java
new file mode 100644
index 0000000..523157c
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java
@@ -0,0 +1,18 @@
+package com.geedgenetworks.core.pojo;
+
+import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.core.utils.KnowlegdeBase.AbstractKnowledgeBase;
+import lombok.Data;
+
+import java.util.List;
+@Data
+public class KnowledgeBaseEntity {
+
+ private List<KnowLedgeEntity> knowLedgeEntityList;
+ private KnowledgeConfig knowledgeConfig;
+ private AbstractKnowledgeBase abstractKnowledgeBase;
+
+
+
+
+}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
index a81bbd3..5a0637d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java
@@ -10,7 +10,7 @@ import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.ProjectionConfig;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.udf.UDF;
-import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule;
+import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseUpdateJob;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.AviatorEvaluatorInstance;
import com.googlecode.aviator.Expression;
@@ -18,9 +18,12 @@ import com.googlecode.aviator.Options;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
+import org.quartz.SchedulerException;
import java.util.LinkedList;
+import static com.geedgenetworks.core.utils.SchedulerUtils.shutdownScheduler;
+
public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
private static final Log logger = LogFactory.get();
@@ -37,7 +40,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
this.functions = new LinkedList<>();
try {
- KnowledgeBaseSchedule.getInstance();
+ // KnowledgeBaseUpdateJob.getInstance();
for (UDFContext udfContext : projectionConfig.getFunctions()) {
Expression compiledExp = null;
UdfEntity udfEntity = new UdfEntity();
@@ -121,6 +124,11 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> {
udfEntity.getUdfFunction().close();
}
+ try {
+ shutdownScheduler();
+ } catch (SchedulerException e) {
+ throw new RuntimeException(e);
+ }
super.close();
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
index 14897ff..fd767cc 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java
@@ -12,14 +12,18 @@ import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule;
+import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseUpdateJob;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
+import org.quartz.SchedulerException;
+import static com.geedgenetworks.core.utils.SchedulerUtils.shutdownScheduler;
+
+@Slf4j
public class AsnLookup implements UDF {
private UDFContext udfContext;
- private static final Log logger = LogFactory.get();
private String vender;
private String option;
@@ -30,18 +34,7 @@ public class AsnLookup implements UDF {
this.udfContext = udfContext;
this.vender = udfContext.getParameters().get("vendor_id").toString();
this.option = udfContext.getParameters().get("option").toString();
- Configuration configuration = (Configuration) runtimeContext
- .getExecutionConfig().getGlobalJobParameters();
- EngineConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class);
- for(KnowledgeConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){
- if(vender.equals(knowledgeConfig.getName())){
- KnowledgeBaseSchedule.initKnowledgeBase(knowledgeConfig);
- }
- else {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param vendor_id value is not correct");
- }
- }
-
+ KnowledgeBaseUpdateJob.initKnowledgeBase(vender,AsnKnowledgeBase.getInstance(), runtimeContext);
}
@@ -78,6 +71,11 @@ public class AsnLookup implements UDF {
@Override
public void close() {
+ try {
+ shutdownScheduler();
+ } catch (SchedulerException e) {
+ throw new RuntimeException(e);
+ }
}
private void checkUdfContext(UDFContext udfContext) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
index 6a5ea60..128eea8 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java
@@ -8,14 +8,15 @@ import com.geedgenetworks.utils.StringUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import java.io.UnsupportedEncodingException;
import java.util.Base64;
+@Slf4j
public class DecodeBase64 implements UDF {
- private static final Log logger = LogFactory.get();
private UDFContext udfContext;
@Override
@@ -51,9 +52,9 @@ public class DecodeBase64 implements UDF {
}
}
} catch (RuntimeException e) {
- logger.error("Resolve Base64 exception, exception information:" + e.getMessage());
+ log.error("Resolve Base64 exception, exception information:" + e.getMessage());
} catch (UnsupportedEncodingException e) {
- logger.error(
+ log.error(
"The Character Encoding ["
+ charset.toString()
+ "] is not supported.exception information:"
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
index ffcbedb..e7a5c37 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java
@@ -7,15 +7,16 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.utils.FormatUtils;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
+@Slf4j
public class Domain implements UDF {
private UDFContext udfContext;
private String option;
- private static final Log logger = LogFactory.get();
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
@@ -35,7 +36,8 @@ public class Domain implements UDF {
}
else{
if(!udfContext.getParameters().get("option").toString().equals("TOP_LEVEL_DOMAIN") &&
- !udfContext.getParameters().get("option").toString().equals("FIRST_SIGNIFICANT_SUBDOMAIN")){
+ !udfContext.getParameters().get("option").toString().equals("FIRST_SIGNIFICANT_SUBDOMAIN") &&
+ !udfContext.getParameters().get("option").toString().equals("FQDN")){
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct");
}
}
@@ -68,7 +70,7 @@ public class Domain implements UDF {
event.getExtractedFields()
.get(lookupField));
break;
- default:
+ case "FQDN":
domain = (String) event.getExtractedFields()
.get(lookupField);
break;
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
index 4f09969..30de35d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java
@@ -8,42 +8,44 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.utils.KnowlegdeBase.IpKnowledgeBase;
-import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule;
+import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseUpdateJob;
+import com.geedgenetworks.domain.LocationResponse;
import com.geedgenetworks.utils.StringUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
+import org.quartz.SchedulerException;
import java.util.HashMap;
+import java.util.Map;
+import static com.geedgenetworks.core.utils.SchedulerUtils.shutdownScheduler;
+
+@Slf4j
public class GeoIpLookup implements UDF {
private UDFContext udfContext;
- private static final Log logger = LogFactory.get();
private String vender;
private String option;
+ private Map<String,String> geolocation_field_mapping;
+
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
checkUdfContext(udfContext);
this.udfContext = udfContext;
this.vender = udfContext.getParameters().get("vendor_id").toString();
this.option = udfContext.getParameters().get("option").toString();
- Configuration configuration = (Configuration) runtimeContext
- .getExecutionConfig().getGlobalJobParameters();
- EngineConfig engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class);
- for(KnowledgeConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){
- if(vender.equals(knowledgeConfig.getName())){
- KnowledgeBaseSchedule.initKnowledgeBase(knowledgeConfig);
- }
- else {
- throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param vendor_id value is not correct");
- }
+ if(option.equals("IP_TO_OBJECT")){
+ this.geolocation_field_mapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
}
+ KnowledgeBaseUpdateJob.initKnowledgeBase(vender,IpKnowledgeBase.getInstance(),runtimeContext);
+
}
@@ -134,13 +136,10 @@ public class GeoIpLookup implements UDF {
.getLookup_fields()
.get(0))
.toString());
- String[] latLng = geo.split(",");
- if (latLng.length == 2) {
- event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(0), latLng[0]);
+
event.getExtractedFields()
- .put(udfContext.getOutput_fields().get(1), latLng[1]);
- }
+ .put(udfContext.getOutput_fields().get(0), geo);
+
break;
case "IP_TO_PROVIDER":
HashMap<String, Object> serverIpMap =
@@ -176,21 +175,64 @@ public class GeoIpLookup implements UDF {
.toString()));
break;
case "IP_TO_OBJECT":
- event.getExtractedFields()
- .put(
- udfContext.getOutput_fields().get(0),
- IpKnowledgeBase.getVenderWithIpLookup()
- .get(vender)
- .infoLookup(
- event.getExtractedFields()
- .get(
- udfContext
- .getLookup_fields()
- .get(0))
- .toString()));
- break;
- default:
- break;
+
+ LocationResponse response = (LocationResponse) IpKnowledgeBase.getVenderWithIpLookup()
+ .get(vender)
+ .infoLookup(
+ event.getExtractedFields()
+ .get(
+ udfContext
+ .getLookup_fields()
+ .get(0))
+ .toString());
+
+ for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) {
+ switch (entry.getKey()) {
+ case "COUNTRY":
+ event.getExtractedFields()
+ .put(entry.getValue(), response.getCountry());
+ break;
+ case "PROVINCE":
+ event.getExtractedFields()
+ .put(entry.getValue(), response.getSuperAdministrativeArea());
+ break;
+ case "CITY":
+ event.getExtractedFields()
+ .put(entry.getValue(), response.getAdministrativeArea());
+ break;
+ case "LONGITUDE":
+ event.getExtractedFields()
+ .put(entry.getValue(), response.getLongitude());
+ break;
+ case "LATITUDE":
+ event.getExtractedFields()
+ .put(entry.getValue(), response.getLatitude());
+ break;
+ case "ISP":
+ event.getExtractedFields()
+ .put(entry.getValue(), response.getIsp());
+ break;
+ case "ORGANIZATION ":
+ event.getExtractedFields()
+ .put(entry.getValue(), response.getOrganization());
+ break;
+ default:
+ break;
+ }
+ event.getExtractedFields()
+ .put(
+ udfContext.getOutput_fields().get(0),
+ IpKnowledgeBase.getVenderWithIpLookup()
+ .get(vender)
+ .infoLookup(
+ event.getExtractedFields()
+ .get(
+ udfContext
+ .getLookup_fields()
+ .get(0))
+ .toString()));
+ break;
+ }
}
}
return event;
@@ -231,7 +273,22 @@ public class GeoIpLookup implements UDF {
throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey geolocation_field_mapping");
}
+ else {
+ Map<String,String> geolocation_field_mapping =(Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping");
+
+ if(!geolocation_field_mapping.isEmpty()){
+ for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) {
+
+ if(!entry.getKey().equals("COUNTRY") && !entry.getKey().equals("PROVINCE") && !entry.getKey().equals("CITY") && !entry.getKey().equals("LONGITUDE") && !entry.getKey().equals("LATITUDE") && !entry.getKey().equals("ISP") && !entry.getKey().equals("ORGANIZATION")){
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct");
+ }
+ }
+ }
+ else {
+ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct");
+ }
+ }
}
}
@@ -245,6 +302,10 @@ public class GeoIpLookup implements UDF {
@Override
public void close() {
-
+ try {
+ shutdownScheduler();
+ } catch (SchedulerException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
index 6af2741..059088d 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java
@@ -5,11 +5,12 @@ import com.geedgenetworks.common.Constants;
import com.geedgenetworks.common.config.EngineConfig;
import com.geedgenetworks.core.pojo.Event;
import com.geedgenetworks.core.pojo.UDFContext;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import java.util.*;
-
+@Slf4j
public class PathCombine implements UDF {
private UDFContext udfContext;
@@ -18,28 +19,36 @@ public class PathCombine implements UDF {
private Map<String, String> pathParameters = new LinkedHashMap<>();
+
+
+
@Override
public void open(RuntimeContext runtimeContext, UDFContext udfContext) {
this.udfContext = udfContext;
-/*
+
Configuration configuration = (Configuration) runtimeContext
.getExecutionConfig().getGlobalJobParameters();
EngineConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class);
-*/
-
+ Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig();
if (udfContext.getParameters() != null
&& !udfContext.getParameters().isEmpty()) {
Object object = udfContext.getParameters().get("path");
- ArrayList<Object> arrayList = new ArrayList<>(Arrays.asList(object));
+ ArrayList<Object> arrayList = new ArrayList<>(Collections.singletonList(object));
for (Object key : arrayList) {
String column =key.toString();
- if(column.contains("global")){
- pathParameters.put(column,"");//待定义全局变量
+ if(column.startsWith("grootstream.properties")){
+ String propertiesConfigKey = column.replaceAll("grootstream.properties.","").trim();
+ if(propertiesConfig.containsKey(propertiesConfigKey)) {
+ pathParameters.put(column,propertiesConfig.get(propertiesConfigKey));//待定义全局变量
+ }
+ else {
+ throw new RuntimeException("propertiesConfigKey "+propertiesConfigKey+" not found ");
+ }
}
else {
- pathParameters.put(column,"dynamic");
+ pathParameters.put(column,"");
}
}
@@ -61,7 +70,7 @@ public class PathCombine implements UDF {
public Event evaluate(Event event) {
StringBuilder stringBuilder = new StringBuilder();
for (Map.Entry<String, String> entry : pathParameters.entrySet()) {
- if (entry.getValue().equals("dynamic")) {
+ if (entry.getValue().isEmpty()) {
stringBuilder.append(event.getExtractedFields().getOrDefault(entry.getKey(),"").toString());
}
else {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
index c0ad7f1..626de27 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java
@@ -74,7 +74,7 @@ public class UnixTimestampConverter implements UDF {
@Override
public String functionName() {
- return "UNIX_TIMESTAMP_FUNCTION";
+ return "UNIX_TIMESTAMP_CONVERTER";
}
@Override
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java
index 00b2b53..2c0137e 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java
@@ -1,5 +1,6 @@
package com.geedgenetworks.core.utils.KnowlegdeBase;
+import com.alibaba.fastjson2.JSON;
import com.geedgenetworks.common.config.KnowledgeConfig;
import com.geedgenetworks.common.utils.HttpClientUtils;
import com.geedgenetworks.core.pojo.KnowLedgeEntity;
@@ -60,15 +61,23 @@ public abstract class AbstractKnowledgeBase {
// HttpClientUtils httpClientUtils = new HttpClientUtils();
// String metadate = httpClientUtils.httpGet(URI.create("/v1/knowledge_base/"+ id +"/meta")) ;
//临时写死
- KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity();
- knowLedgeEntity.setIsValid(1);
- knowLedgeEntity.setSha256("c06db87e9a914a8296ca892880c353884ac15b831c485c29fad9889ffb4e0fa8");
- knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/4bd16ddb-d1cb-4310-b620-02164335c50d-aXBfYnVpbHRpbg==.mmdb");
- /* knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c");
- knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb");*/
- // List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {});
- // return knowLedgeEntityList.get(0);
- return knowLedgeEntity;
+ KnowLedgeEntity knowLedgeEntity1 = new KnowLedgeEntity();
+ knowLedgeEntity1.setIsValid(1);
+ knowLedgeEntity1.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c");
+ knowLedgeEntity1.setPath("http://192.168.44.12:9098/hos/knowledge_base_bucket/84ec56db-8f3b-440a-af79-15148d7cd3c8-YXNuX2J1aWx0aW4=.mmdb");
+
+/*
+ KnowLedgeEntity knowLedgeEntity2 = new KnowLedgeEntity();
+ knowLedgeEntity2.setIsValid(1);
+ knowLedgeEntity2.setSha256("044228128a1cc7ae0f8d4bae9fdc73eac9dea3e074e7c15796d29167b89cfdee");
+ knowLedgeEntity2.setPath("http://192.168.44.12:9098/hos/knowledge_base_bucket/63220f3e-7792-43a8-9921-d01da16b76b2-aXBfYnVpbHRpbg==.mmdb");
+*/
+
+ /* knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c");
+ knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb");
+ /* List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {});
+ return knowLedgeEntityList.get(0);*/
+ return knowLedgeEntity1;
} catch (Exception e) {
logger.error("get file Metadata " + id + " error: " + e.getMessage());
}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java
index a62acf3..12b29ce 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java
@@ -16,13 +16,13 @@ import java.util.concurrent.ConcurrentHashMap;
public class AsnKnowledgeBase extends AbstractKnowledgeBase {
- private static final Logger LOG = LoggerFactory.getLogger(AsnKnowledgeBase.class);
@Getter
private static Map<String, IpLookupV2> venderWithAsnLookup = new ConcurrentHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(AsnKnowledgeBase.class);
private static AsnKnowledgeBase instance;
// 私有构造函数,防止外部实例化
+
public static synchronized AsnKnowledgeBase getInstance() {
if (instance == null) {
instance = new AsnKnowledgeBase();
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java
deleted file mode 100644
index ccbf1a5..0000000
--- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package com.geedgenetworks.core.utils.KnowlegdeBase;
-
-import com.geedgenetworks.common.config.CommonConfig;
-import com.geedgenetworks.common.config.KnowledgeConfig;
-import com.geedgenetworks.core.pojo.KnowLedgeEntity;
-import lombok.Getter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.security.NoSuchAlgorithmException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-
-public class KnowledgeBaseSchedule {
-
- private static Timer timer;
-
- public static Map<String, KnowledgeConfig> knowledgeConfigMap = new ConcurrentHashMap<>();
- public static Map<String, List<KnowLedgeEntity>> KnowLedgeEntityMap = new ConcurrentHashMap<>();
- @Getter
- private static Map<String, String> knowledgeBaseClassReflect= new ConcurrentHashMap<>() ;
- private static final Logger logger = LoggerFactory.getLogger(KnowledgeBaseSchedule.class);
-
- private static KnowledgeBaseSchedule instance;
-
- public static synchronized KnowledgeBaseSchedule getInstance() {
- if (instance == null) {
- instance = new KnowledgeBaseSchedule();
- knowledgeBaseClassReflect =CommonConfig.KNOWLEDGE_BASE_CLASS_REFLECT;
- ScheduTask.startTask();
- }
- return instance;
- }
-
- public static void updateKnowledgeBase(String name) {
-
- try {
- // 获取私有构造方法
- Class<?> cls = Class.forName(knowledgeBaseClassReflect.get(knowledgeConfigMap.get(name).getType()));
- Constructor<?> constructor = cls.getDeclaredConstructor();
- constructor.setAccessible(true);
- // knowledgeUtil instance = (knowledgeUtil) constructor.newInstance();
-
- Method createInstanceMethod = cls.getMethod("getInstance");
- AbstractKnowledgeBase instance = (AbstractKnowledgeBase) createInstanceMethod.invoke(null);
- // 使用实例调用方法
- instance.updateKnowledgeBase(knowledgeConfigMap.get(name));
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
-
-
-
- public static synchronized void initKnowledgeBase(KnowledgeConfig KnowledgeConfig) {
-
- if (instance == null) {
- getInstance();
- }
- if (!knowledgeConfigMap.containsKey(KnowledgeConfig.getName())) {
- List<KnowLedgeEntity> knowLedgeEntityList = new ArrayList<>();
- try {
- knowledgeConfigMap.put(KnowledgeConfig.getName(), KnowledgeConfig);
- updateKnowledgeBase(KnowledgeConfig.getName());
- for (String id : KnowledgeConfig.getFiles()) {
- KnowLedgeEntity knowLedgeEntity = getMetadata(id);
- knowLedgeEntityList.add(knowLedgeEntity);
- }
- KnowLedgeEntityMap.put(KnowledgeConfig.getName(), knowLedgeEntityList);
- } catch (Exception e) {
- logger.error("initKnowledgeBase " + KnowledgeConfig.getName() + " error: " + e.getMessage());
- }
- KnowLedgeEntityMap.put(KnowledgeConfig.getName(), knowLedgeEntityList);
- }
-
-
- }
- private static KnowLedgeEntity getMetadata(String id) {
-
- try{
-/* 暂时写死
- HttpClientUtils httpClientUtils = new HttpClientUtils();
- String metadate = httpClientUtils.httpGet(URI.create("/v1/knowledge_base/"+ id +"/meta")) ;
- List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {});
- return knowLedgeEntityList.get(0);
-*/
- KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity();
- knowLedgeEntity.setIsValid(1);
- knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c");
- knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb");
- return knowLedgeEntity;
- } catch (Exception e) {
- logger.error("get file Metadata " + id + " error: " + e.getMessage());
- }
-
- return null;
- }
-
-
- public static synchronized Boolean checkUpdateMesssage (String name) throws NoSuchAlgorithmException {
-
- //暂时写死Boolean changeFlag = false;
- Boolean changeFlag = true;
- if(!KnowLedgeEntityMap.get(name).isEmpty()) {
-
- for (KnowLedgeEntity knowLedgeEntity : KnowLedgeEntityMap.get(name)) {
-
- if (!knowLedgeEntity.getSha256().equals(getMetadata(knowLedgeEntity.getId()).getSha256())) {
- return true;
-
- }
- }
- }
- else {
- changeFlag = true;
- }
- return changeFlag;
-
- }
-
-
-
-
-
-
- static class ScheduTask extends TimerTask {
-
-
- @Override
- public void run() {
- for (Map.Entry<String, KnowledgeConfig> entry : knowledgeConfigMap.entrySet()) {
- Boolean flag = null;
- try {
- flag = checkUpdateMesssage(entry.getValue().getName());
- if(flag){
- updateKnowledgeBase(entry.getKey());
- }
- } catch (NoSuchAlgorithmException e) {
- logger.error(e.getMessage());
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
- }
-
- public static void startTask() {
- timer = new Timer();
- timer.schedule(new ScheduTask(), 0, 300000); // 每隔300秒执行一次任务
- }
-
- public static void stopTask() {
- if (timer != null) {
- timer.cancel();
- timer = null;
- }
- }
- }
-
-}
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java
new file mode 100644
index 0000000..98b48a2
--- /dev/null
+++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java
@@ -0,0 +1,191 @@
+package com.geedgenetworks.core.utils.KnowlegdeBase;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
+import com.geedgenetworks.common.Constants;
+import com.geedgenetworks.common.config.CommonConfig;
+import com.geedgenetworks.common.config.EngineConfig;
+import com.geedgenetworks.common.config.KnowledgeConfig;
+import com.geedgenetworks.common.utils.HttpClientUtils;
+import com.geedgenetworks.core.pojo.KnowLedgeEntity;
+import com.geedgenetworks.core.pojo.KnowledgeBaseEntity;
+import lombok.Getter;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseScheduler.startTask;
+
+
+public class KnowledgeBaseUpdateJob implements Job {
+
+
+/*
+ public static Map<String, KnowledgeConfig> knowledgeConfigMap = new ConcurrentHashMap<>();
+ public static Map<String, List<KnowLedgeEntity>> KnowLedgeEntityMap = new ConcurrentHashMap<>();
+
+*/
+
+ @Getter
+ private static Map<String, KnowledgeBaseEntity> KnowLedgeBaseEntityMap= new ConcurrentHashMap<>() ;
+ private static final Logger logger = LoggerFactory.getLogger(KnowledgeBaseUpdateJob.class);
+
+ private static KnowledgeBaseUpdateJob instance;
+
+ private static EngineConfig engineConfig;
+
+ public static synchronized KnowledgeBaseUpdateJob getInstance(RuntimeContext runtimeContext) {
+ if (instance == null) {
+ instance = new KnowledgeBaseUpdateJob();
+ // knowledgeBaseClassReflect =CommonConfig.KNOWLEDGE_BASE_CLASS_REFLECT;
+ Configuration configuration = (Configuration) runtimeContext
+ .getExecutionConfig().getGlobalJobParameters();
+ engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class);
+ startTask();
+ }
+ return instance;
+ }
+
+ public static void updateKnowledgeBase(String name) {
+
+ try {
+ // 获取私有构造方法
+/* Class<?> cls = Class.forName(knowledgeBaseClassReflect.get(knowledgeConfigMap.get(name).getType()));
+ Constructor<?> constructor = cls.getDeclaredConstructor();
+ constructor.setAccessible(true);
+ Method createInstanceMethod = cls.getMethod("getInstance");
+ AbstractKnowledgeBase instance = (AbstractKnowledgeBase) createInstanceMethod.invoke(null);
+ // 使用实例调用方法
+ instance.updateKnowledgeBase(knowledgeConfigMap.get(name));*/
+
+
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+
+
+ public static synchronized void initKnowledgeBase(String name,AbstractKnowledgeBase abstractKnowledgeBase,RuntimeContext runtimeContext) {
+
+ if (instance == null) {
+ getInstance(runtimeContext);
+ }
+
+ if (!KnowLedgeBaseEntityMap.containsKey(name)) {
+ List<KnowLedgeEntity> knowLedgeEntityList = new ArrayList<>();
+ KnowledgeBaseEntity knowledgeBaseEntity = new KnowledgeBaseEntity();
+ knowledgeBaseEntity.setAbstractKnowledgeBase(abstractKnowledgeBase);
+ knowledgeBaseEntity.setKnowLedgeEntityList(knowLedgeEntityList);
+
+ try {
+ for(KnowledgeConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){
+ if(name.equals(knowledgeConfig.getName())){
+ knowledgeBaseEntity.setKnowledgeConfig(knowledgeConfig);
+ break;
+ }
+ }
+ KnowLedgeBaseEntityMap.put(name, knowledgeBaseEntity);
+ for (String id : knowledgeBaseEntity.getKnowledgeConfig().getFiles()) {
+ // KnowLedgeEntity knowLedgeEntity = getMetadata(id);
+ KnowLedgeEntity knowLedgeEntity1 = new KnowLedgeEntity();
+ knowLedgeEntity1.setIsValid(1);
+ knowLedgeEntity1.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c");
+ knowLedgeEntity1.setPath("http://192.168.44.12:9098/hos/knowledge_base_bucket/84ec56db-8f3b-440a-af79-15148d7cd3c8-YXNuX2J1aWx0aW4=.mmdb");
+
+/*
+ KnowLedgeEntity knowLedgeEntity2 = new KnowLedgeEntity();
+ knowLedgeEntity2.setIsValid(1);
+ knowLedgeEntity2.setSha256("044228128a1cc7ae0f8d4bae9fdc73eac9dea3e074e7c15796d29167b89cfdee");
+ knowLedgeEntity2.setPath("http://192.168.44.12:9098/hos/knowledge_base_bucket/63220f3e-7792-43a8-9921-d01da16b76b2-aXBfYnVpbHRpbg==.mmdb");
+*/
+
+ knowLedgeEntityList.add(knowLedgeEntity1);
+ }
+
+
+ updateKnowledgeBase(name);
+ } catch (Exception e) {
+ logger.error("initKnowledgeBase " + e.getMessage());
+ }
+ }
+
+
+ }
+ private static KnowLedgeEntity getMetadata(String id) {
+
+ try{
+ HttpClientUtils httpClientUtils = new HttpClientUtils();
+ String metadate = httpClientUtils.httpGet(URI.create("/v1/knowledge_base/"+ id +"/meta")) ;
+ List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {});
+ return knowLedgeEntityList.get(0);
+
+ /* KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity();
+ knowLedgeEntity.setIsValid(1);
+ knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c");
+ knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb");
+ return knowLedgeEntity;*/
+ } catch (Exception e) {
+ logger.error("get file Metadata " + id + " error: " + e.getMessage());
+ }
+
+ return null;
+ }
+
+
+ public static synchronized Boolean checkUpdateMesssage (String name) {
+
+ System.out.println("check +++++++++++++++++++++++++++++++++++++");
+ /* Boolean changeFlag = false;
+ if(!KnowLedgeEntityMap.get(name).isEmpty()) {
+
+ for (KnowLedgeEntity knowLedgeEntity : KnowLedgeEntityMap.get(name)) {
+
+ KnowLedgeEntity metadata =getMetadata(knowLedgeEntity.getId());
+ if(metadata!=null) {
+ if (!knowLedgeEntity.getSha256().equals(metadata.getSha256())) {
+ return true;
+ }
+ }
+ else {
+ logger.error("get file Metadata " + knowLedgeEntity.getId() + " error " );
+ }
+ }
+ }
+ else {
+ changeFlag = true;
+ }*/
+ return true;
+
+ }
+
+ @Override
+ public void execute(JobExecutionContext jobExecutionContext) {
+ for (Map.Entry<String, KnowledgeBaseEntity> entry : KnowLedgeBaseEntityMap.entrySet()) {
+ Boolean flag = null;
+ try {
+ flag = checkUpdateMesssage(entry.getKey());
+ if(flag){
+ updateKnowledgeBase(entry.getKey());
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+ }
+
+
+
+
+}
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
index ab23331..a2afad9 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java
@@ -5,6 +5,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException;
import com.geedgenetworks.core.pojo.UDFContext;
import com.geedgenetworks.core.udf.GeoIpLookup;
import com.geedgenetworks.core.utils.KnowlegdeBase.IpKnowledgeBase;
+import com.geedgenetworks.domain.LocationResponse;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -82,7 +83,7 @@ public class GeoIpLookupFunctionTest {
String latLngLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").latLngLookup("2600:1015:b002::");
String ispLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").ispLookup("2600:1015:b002::");
String infoLookupToJSONString = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookupToJSONString("2600:1015:b002::");
- //String infoLookup = (String) IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookup("2600:1015:b002::");
+ LocationResponse infoLookup = (LocationResponse) IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookup("2600:1015:b002::");
System.out.println("countryLookup:" + countryLookup);
@@ -94,7 +95,7 @@ public class GeoIpLookupFunctionTest {
System.out.println("ispLookup:" + ispLookup);
System.out.println("infoLookupToJSONString:" + infoLookupToJSONString);
System.out.println("infoLookup:" + IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookup("2600:1015:b002::"));
- System.out.println("----------------------------");
+ System.out.println(infoLookup+"----------------------------");
// assertEquals("6167", asn);
diff --git a/pom.xml b/pom.xml
index d3677f5..7640ec4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,8 +64,7 @@
<lombok.version>1.18.24</lombok.version>
<config.version>1.3.3</config.version>
<hazelcast.version>5.1</hazelcast.version>
-
-
+ <quartz.version>2.3.2</quartz.version>
</properties>
<dependencyManagement>
@@ -378,7 +377,11 @@
</exclusions>
</dependency>
-
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ <version>${quartz.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>