diff options
| author | wangkuan <[email protected]> | 2023-12-12 19:20:58 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2023-12-12 19:20:58 +0800 |
| commit | 49ed485e1b8817ca838613f2c548c75f086627cb (patch) | |
| tree | 4ff515cf5efe4136aa6fa5c61c65ac433ac20ff9 | |
| parent | 0452c73c0acbf1e4ac45e46f94350f0313a8d7a0 (diff) | |
[improve][core][common] 增加定时任务工具类,部分udf优化,tsg任务所有功能配置,grootstream全局参数修改(暂时只修改配置文件,没有去掉engine待讨论)
21 files changed, 537 insertions, 360 deletions
diff --git a/config/grootstream.yaml b/config/grootstream.yaml index a9e2791..22be603 100644 --- a/config/grootstream.yaml +++ b/config/grootstream.yaml @@ -1,39 +1,44 @@ grootstream: - engine: - http_con_pool: - max_total: 400 - max_per_route: 60 - connection_request_timeout: 60000 - connect_timeout: 60000 - socket_timeout: 60000 - knowledge_base: - - name: TSG_asnlookup - type: asnlookup - properties: - fs_type: hos - fs_default_path: http://path - files: - default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0 - user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb - - name: CN_asnlookup - type: asnlookup - files: - default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0 - user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb nacos: server_addr: 192.168.44.12:8848 username: nacos password: nacos - namespace: test - data_id: knowledge_base.json - group: DEFAULT_GROUP + + read_timeout: 5000 consul: server_addr: 192.168.41.30 server_port: 8500 token: c989b503-1d74-f49c-5698-e90fe461e938 - hdfs: - servers: 192.168.44.11:9000,192.168.44.14:9000 + knowledge_base: + - name: TSG_asnlookup + type: asnlookup + properties: + fs_type: hos + fs_default_path: http://path + files: + default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0 + user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb + - name: CN_asnlookup + type: asnlookup + files: + default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0 + user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb + properties: + hos.path: //path + hos.traffic.file.bucket: bucket +#grootstream.properties.hos_path + + + + + + + + + + + diff --git a/config/grootstream_job_template.yaml b/config/grootstream_job_template.yaml index c125ebb..ad96a07 100644 --- a/config/grootstream_job_template.yaml +++ b/config/grootstream_job_template.yaml @@ -6,8 +6,8 @@ sources: # watermark_timestamp_unit: ms # [string] Watermark Unit, default is ms # watermark_lag: 60 # [number] Watermark Lag, default is 60 properties: # [object] Source Properties - topic: SESSION-RECORD-COMPLETED - kafka.bootstrap.servers: 192.168.44.11:9092 + topic: SESSION-RECORD + kafka.bootstrap.servers: 192.168.44.12:9092 kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 @@ -95,59 +95,99 @@ processing_pipelines: functions: # [array of object] Function List - function: ASN_LOOKUP - lookup_fields: [server_ip] - output_fields: [server_asn] + lookup_fields: [ server_ip ] + output_fields: [ server_asn ] + parameters: + option: IP_TO_ASN + vendor_id: tsg_asnlookup + + - function: ASN_LOOKUP + lookup_fields: [ client_ip ] + output_fields: [ client_asn ] parameters: option: IP_TO_ASN - vendor_id : tsg_asnlookup + vendor_id: tsg_asnlookup + - function: SNOWFLAKE_ID lookup_fields: [ '' ] - output_fields: [ common_log_id ] + output_fields: [ log_id ] filter: - - function: EVAL - output_fields: [ common_internal_ip ] - parameters: - value_expression: 'common_direction=69 ? common_client_ip : common_server_ip' - - function: EVAL - output_fields: [ common_external_ip ] - parameters: - value_expression: 'common_direction=73 ? common_client_ip : common_server_ip' + - function: JSON_EXTRACT - lookup_fields: [ common_device_tag ] - output_fields: [ common_data_center ] + lookup_fields: [ device_tag ] + output_fields: [ data_center ] filter: parameters: param: $.tags[?(@.tag=='data_center')][0].value + - function: JSON_EXTRACT - lookup_fields: [ common_device_tag ] - output_fields: [ common_device_group ] + lookup_fields: [ device_tag ] + output_fields: [ device_group ] filter: parameters: param: $.tags[?(@.tag=='device_group')][0].value + - function: UNIX_TIMESTAMP_FUNCTION - output_fields: [ common_processing_time ] + output_fields: [ processing_time ] parameters: precision: seconds + + - function: UNIX_TIMESTAMP_CONVERTER + lookup_fields: [ __timestamp ] + output_fields: [ recv_time ] + parameters: + precision: seconds + - function: EVAL - output_fields: [ common_recv_time ] + output_fields: [ recv_time ] parameters: - value_expression: 'common_ingestion_time' + value_expression: 'ingestion_time' + - function: DOMAIN - lookup_fields: [ http_host,ssl_sni,quic_sni ] - output_fields: [ common_server_domain ] + lookup_fields: [ http_host,ssl_sni,dtls_sni,quic_sni ] + output_fields: [ server_domain ] parameters: option: FIRST_SIGNIFICANT_SUBDOMAIN - - function: EVAL - output_fields: [ http_domain ] - parameters: - value_expression: 'common_server_domain' + - function: BASE64_DECODE_TO_STRING lookup_fields: [ mail_subject,mail_subject_charset ] output_fields: [ mail_subject ] + - function: BASE64_DECODE_TO_STRING lookup_fields: [ mail_attachment_name,mail_attachment_name_charset ] output_fields: [ mail_attachment_name ] + - function: PATH_COMBINE + lookup_fields: [ packet_capture_file ] + output_fields: [ packet_capture_file ] + parameters: + path: [ global.hos_path, global.pcap_file_bucket_name, packet_capture_file ] + + - function: PATH_COMBINE + lookup_fields: [ rtp_pcap_path ] + output_fields: [ rtp_pcap_path ] + parameters: + path: [ global.hos_path, global.pcap_file_bucket_name, rtp_pcap_path ] + + - function: PATH_COMBINE + lookup_fields: [ http_request_body ] + output_fields: [ http_request_body ] + parameters: + path: [ global.hos_path, global.traffic_file_bucket, http_request_body ] + + - function: PATH_COMBINE + lookup_fields: [ http_response_body ] + output_fields: [ http_response_body ] + parameters: + path: [ global.hos_path, global.traffic_file_bucket, http_response_body ] + + - function: PATH_COMBINE + lookup_fields: [ mail_eml_file ] + output_fields: [ mail_eml_file ] + parameters: + path: [ global.hos_path, global.traffic_file_bucket, mail_eml_file ] + + sinks: kafka_sink_a: type: kafka @@ -195,11 +235,11 @@ application: # [object] Application Configuration env: # [object] Environment Variables execution.parallelism: 1 # [number] Job-Level Parallelism topology: # [array of object] Node List. It will be used build data flow for job dag graph. - - name: inline_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. + - name: kafka_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. parallelism: 1 # [number] Operator-Level Parallelism. - downstream: [schema_type_filter] # [array of string] Downstream Node Name List. - - name: schema_type_filter - parallelism: 1 + # downstream: [schema_type_filter] # [array of string] Downstream Node Name List. + #- name: schema_type_filter + # parallelism: 1 downstream: [session_record_processor] - name: session_record_processor parallelism: 1 diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java b/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java index 3591ce6..a286086 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/EngineConfig.java @@ -4,6 +4,7 @@ import lombok.Data; import java.io.Serializable; import java.util.List; +import java.util.Map; import static com.google.common.base.Preconditions.checkNotNull; @@ -20,6 +21,7 @@ public class EngineConfig implements Serializable { private ConsulConfig consulConfig = ServerConfigOptions.CONSUL.defaultValue(); private ZookeeperConfig zookeeperConfig = ServerConfigOptions.ZOOKEEPER.defaultValue(); private HdfsConfig hdfsConfig = ServerConfigOptions.HDFS.defaultValue(); + private Map<String,String> propertiesConfig = ServerConfigOptions.PROPERTIES.defaultValue(); public void setKnowledgeBaseConfig(List<KnowledgeConfig> knowledgeBaseConfig) { diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java b/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java index 04bc7ec..2c03b21 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/ServerConfigOptions.java @@ -197,6 +197,10 @@ public class ServerConfigOptions { .defaultValue(new HdfsConfig()) .withDescription("The hdfs configuration."); - + public static final Option<Map<String, String>> PROPERTIES = + Options.key("properties") + .mapType() + .defaultValue(new HashMap<String,String>()) + .withDescription("The properties of grootstream"); } diff --git a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java index ccdd58a..843863f 100644 --- a/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java +++ b/groot-common/src/main/java/com/geedgenetworks/common/config/YamlGrootStreamDomConfigProcessor.java @@ -23,7 +23,7 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso this.config = config; } - @Override +/* @Override public void buildConfig(Node rootNode) { for (Node node : childElements(rootNode)) { String nodeName = cleanNodeName(node); @@ -38,18 +38,20 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso occurrenceSet.add(nodeName); } } - } + }*/ - private boolean handleNode(Node node, String name) { - if (GrootStreamConfigSections.ENGINE.isEqual(name)) { +/* private boolean handleNode(Node node, String name) { + if (GrootStreamConfigSections.GROOTSTREAM.isEqual(name)) { parseEngineConfig(node, config); } else { return true; } + parseEngineConfig(node, config); + return false; - } + }*/ - private void parseEngineConfig(Node engineNode, GrootStreamConfig config) { + public void buildConfig(Node engineNode) { final EngineConfig engineConfig = config.getEngineConfig(); for (Node node : childElements(engineNode)) { String name = cleanNodeName(node); @@ -65,6 +67,8 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso engineConfig.setZookeeperConfig(parseZookeeperConfig(node)); } else if (ServerConfigOptions.HDFS.key().equals(name)) { engineConfig.setHdfsConfig(parseHdfsConfig(node)); + } else if (ServerConfigOptions.PROPERTIES.key().equals(name)) { + engineConfig.setPropertiesConfig(parsePropertiesConfig(node)); } else { LOGGER.warning("Unrecognized configuration element: " + name); @@ -73,6 +77,16 @@ public class YamlGrootStreamDomConfigProcessor extends AbstractDomConfigProcesso } } + private Map<String, String> parsePropertiesConfig(Node properties) { + + Map<String, String> propertiesMap = new HashMap<>(); + for (Node node : childElements(properties)) { + String name = cleanNodeName(node); + propertiesMap.put(name,getTextContent(node)); + } + return propertiesMap; + } + private ZookeeperConfig parseZookeeperConfig(Node zookeeperNode) { ZookeeperConfig zookeeperConfig = new ZookeeperConfig(); diff --git a/groot-common/src/main/resources/groot-platform-plugin b/groot-common/src/main/resources/groot-platform-plugin index bebf1e9..d22c057 100644 --- a/groot-common/src/main/resources/groot-platform-plugin +++ b/groot-common/src/main/resources/groot-platform-plugin @@ -6,4 +6,6 @@ com.geedgenetworks.core.udf.JsonExtract com.geedgenetworks.core.udf.UnixTimestamp com.geedgenetworks.core.udf.Domain com.geedgenetworks.core.udf.DecodeBase64 -com.geedgenetworks.core.udf.GeoIpLookup
\ No newline at end of file +com.geedgenetworks.core.udf.GeoIpLookup +com.geedgenetworks.core.udf.PathCombine +com.geedgenetworks.core.udf.UnixTimestampConverter diff --git a/groot-common/src/main/resources/grootstream.yaml b/groot-common/src/main/resources/grootstream.yaml index d0906bc..7ce881c 100644 --- a/groot-common/src/main/resources/grootstream.yaml +++ b/groot-common/src/main/resources/grootstream.yaml @@ -1,53 +1,25 @@ grootstream: - engine: - tick_tuple_freq_secs: 90 - gtpc_scan_max_rows: 10000 - radius_scan_max_rows: 10000 - radius_table_name: RADIUS-TABLE - gtpc_table_name: GTPC-TABLE - hbase_rpc_timeout: 60000 - http_con_pool: - max_total: 400 - max_per_route: 60 - connection_request_timeout: 60000 - connect_timeout: 60000 - socket_timeout: 60000 - knowledge_base: - - name: tsg_asnlookup - type: asnlookup - properties: - fs_type: hos - fs_default_path: http://path - files: - default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0 - user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb - - name: tsg_geoiplookup - type: geoiplookup - files: - default_mmdb_file: 7ce2f9890950ba90-fcc25696bf11a8a0 - user_defined_mmdb_file: 7ce2f9890950ba90-71f13b3736863ddb - - snowflake: - data_center_id_num: 1 - nacos: - server_addr: 192.168.44.12:8848 - username: nacos - password: nacos - namespace: test - data_id: knowledge_base.json - group: DEFAULT_GROUP - read_timeout: 5000 - consul: - server_addr: 192.168.41.30 - server_port: 8500 - token: c989b503-1d74-f49c-5698-e90fe461e938 - hos: - token: c21f969b5f03d33d43e04f8f136e7682 - zookeeper: - quorum: 192.168.44.12:2181 - hdfs: - servers: 192.168.44.11:9000,192.168.44.14:9000 - - - + http_con_pool: + max_total: 400 + max_per_route: 60 + connection_request_timeout: 60000 + connect_timeout: 60000 + socket_timeout: 60000 + knowledge_base: + - name: tsg_asnlookup + type: asnlookup + properties: + fs_type: hos + fs_default_path: http://path + files: + 7ce2f9890950ba90-fcc25696bf11a8a0 + 7ce2f9890950ba90-71f13b3736863ddb + - name: tsg_geoiplookup + type: geoiplookup + files: + 7ce2f9890950ba90-fcc25696bf11a8a0 + 7ce2f9890950ba90-71f13b3736863ddb + properties: + hos.path: //path + hos.traffic.file.bucket: bucket
\ No newline at end of file diff --git a/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java new file mode 100644 index 0000000..523157c --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/pojo/KnowledgeBaseEntity.java @@ -0,0 +1,18 @@ +package com.geedgenetworks.core.pojo; + +import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.core.utils.KnowlegdeBase.AbstractKnowledgeBase; +import lombok.Data; + +import java.util.List; +@Data +public class KnowledgeBaseEntity { + + private List<KnowLedgeEntity> knowLedgeEntityList; + private KnowledgeConfig knowledgeConfig; + private AbstractKnowledgeBase abstractKnowledgeBase; + + + + +} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java index a81bbd3..5a0637d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/processor/projection/ProjectionProcessFunction.java @@ -10,7 +10,7 @@ import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.ProjectionConfig; import com.geedgenetworks.core.pojo.UDFContext; import com.geedgenetworks.core.udf.UDF; -import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule; +import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseUpdateJob; import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.AviatorEvaluatorInstance; import com.googlecode.aviator.Expression; @@ -18,9 +18,12 @@ import com.googlecode.aviator.Options; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; +import org.quartz.SchedulerException; import java.util.LinkedList; +import static com.geedgenetworks.core.utils.SchedulerUtils.shutdownScheduler; + public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { private static final Log logger = LogFactory.get(); @@ -37,7 +40,7 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { this.functions = new LinkedList<>(); try { - KnowledgeBaseSchedule.getInstance(); + // KnowledgeBaseUpdateJob.getInstance(); for (UDFContext udfContext : projectionConfig.getFunctions()) { Expression compiledExp = null; UdfEntity udfEntity = new UdfEntity(); @@ -121,6 +124,11 @@ public class ProjectionProcessFunction extends ProcessFunction<Event, Event> { udfEntity.getUdfFunction().close(); } + try { + shutdownScheduler(); + } catch (SchedulerException e) { + throw new RuntimeException(e); + } super.close(); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java index 14897ff..fd767cc 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/AsnLookup.java @@ -12,14 +12,18 @@ import com.geedgenetworks.core.utils.KnowlegdeBase.AsnKnowledgeBase; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule; +import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseUpdateJob; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; +import org.quartz.SchedulerException; +import static com.geedgenetworks.core.utils.SchedulerUtils.shutdownScheduler; + +@Slf4j public class AsnLookup implements UDF { private UDFContext udfContext; - private static final Log logger = LogFactory.get(); private String vender; private String option; @@ -30,18 +34,7 @@ public class AsnLookup implements UDF { this.udfContext = udfContext; this.vender = udfContext.getParameters().get("vendor_id").toString(); this.option = udfContext.getParameters().get("option").toString(); - Configuration configuration = (Configuration) runtimeContext - .getExecutionConfig().getGlobalJobParameters(); - EngineConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class); - for(KnowledgeConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){ - if(vender.equals(knowledgeConfig.getName())){ - KnowledgeBaseSchedule.initKnowledgeBase(knowledgeConfig); - } - else { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param vendor_id value is not correct"); - } - } - + KnowledgeBaseUpdateJob.initKnowledgeBase(vender,AsnKnowledgeBase.getInstance(), runtimeContext); } @@ -78,6 +71,11 @@ public class AsnLookup implements UDF { @Override public void close() { + try { + shutdownScheduler(); + } catch (SchedulerException e) { + throw new RuntimeException(e); + } } private void checkUdfContext(UDFContext udfContext) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java index 6a5ea60..128eea8 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/DecodeBase64.java @@ -8,14 +8,15 @@ import com.geedgenetworks.utils.StringUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import java.io.UnsupportedEncodingException; import java.util.Base64; +@Slf4j public class DecodeBase64 implements UDF { - private static final Log logger = LogFactory.get(); private UDFContext udfContext; @Override @@ -51,9 +52,9 @@ public class DecodeBase64 implements UDF { } } } catch (RuntimeException e) { - logger.error("Resolve Base64 exception, exception information:" + e.getMessage()); + log.error("Resolve Base64 exception, exception information:" + e.getMessage()); } catch (UnsupportedEncodingException e) { - logger.error( + log.error( "The Character Encoding [" + charset.toString() + "] is not supported.exception information:" diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java index ffcbedb..e7a5c37 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/Domain.java @@ -7,15 +7,16 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.UDFContext; import com.geedgenetworks.utils.FormatUtils; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; +@Slf4j public class Domain implements UDF { private UDFContext udfContext; private String option; - private static final Log logger = LogFactory.get(); @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { @@ -35,7 +36,8 @@ public class Domain implements UDF { } else{ if(!udfContext.getParameters().get("option").toString().equals("TOP_LEVEL_DOMAIN") && - !udfContext.getParameters().get("option").toString().equals("FIRST_SIGNIFICANT_SUBDOMAIN")){ + !udfContext.getParameters().get("option").toString().equals("FIRST_SIGNIFICANT_SUBDOMAIN") && + !udfContext.getParameters().get("option").toString().equals("FQDN")){ throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters option value is not correct"); } } @@ -68,7 +70,7 @@ public class Domain implements UDF { event.getExtractedFields() .get(lookupField)); break; - default: + case "FQDN": domain = (String) event.getExtractedFields() .get(lookupField); break; diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java index 4f09969..30de35d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/GeoIpLookup.java @@ -8,42 +8,44 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.UDFContext; import com.geedgenetworks.core.utils.KnowlegdeBase.IpKnowledgeBase; -import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseSchedule; +import com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseUpdateJob; +import com.geedgenetworks.domain.LocationResponse; import com.geedgenetworks.utils.StringUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.TypeReference; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; +import org.quartz.SchedulerException; import java.util.HashMap; +import java.util.Map; +import static com.geedgenetworks.core.utils.SchedulerUtils.shutdownScheduler; + +@Slf4j public class GeoIpLookup implements UDF { private UDFContext udfContext; - private static final Log logger = LogFactory.get(); private String vender; private String option; + private Map<String,String> geolocation_field_mapping; + @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { checkUdfContext(udfContext); this.udfContext = udfContext; this.vender = udfContext.getParameters().get("vendor_id").toString(); this.option = udfContext.getParameters().get("option").toString(); - Configuration configuration = (Configuration) runtimeContext - .getExecutionConfig().getGlobalJobParameters(); - EngineConfig engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class); - for(KnowledgeConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){ - if(vender.equals(knowledgeConfig.getName())){ - KnowledgeBaseSchedule.initKnowledgeBase(knowledgeConfig); - } - else { - throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "The param vendor_id value is not correct"); - } + if(option.equals("IP_TO_OBJECT")){ + this.geolocation_field_mapping = (Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping"); } + KnowledgeBaseUpdateJob.initKnowledgeBase(vender,IpKnowledgeBase.getInstance(),runtimeContext); + } @@ -134,13 +136,10 @@ public class GeoIpLookup implements UDF { .getLookup_fields() .get(0)) .toString()); - String[] latLng = geo.split(","); - if (latLng.length == 2) { - event.getExtractedFields() - .put(udfContext.getOutput_fields().get(0), latLng[0]); + event.getExtractedFields() - .put(udfContext.getOutput_fields().get(1), latLng[1]); - } + .put(udfContext.getOutput_fields().get(0), geo); + break; case "IP_TO_PROVIDER": HashMap<String, Object> serverIpMap = @@ -176,21 +175,64 @@ public class GeoIpLookup implements UDF { .toString())); break; case "IP_TO_OBJECT": - event.getExtractedFields() - .put( - udfContext.getOutput_fields().get(0), - IpKnowledgeBase.getVenderWithIpLookup() - .get(vender) - .infoLookup( - event.getExtractedFields() - .get( - udfContext - .getLookup_fields() - .get(0)) - .toString())); - break; - default: - break; + + LocationResponse response = (LocationResponse) IpKnowledgeBase.getVenderWithIpLookup() + .get(vender) + .infoLookup( + event.getExtractedFields() + .get( + udfContext + .getLookup_fields() + .get(0)) + .toString()); + + for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) { + switch (entry.getKey()) { + case "COUNTRY": + event.getExtractedFields() + .put(entry.getValue(), response.getCountry()); + break; + case "PROVINCE": + event.getExtractedFields() + .put(entry.getValue(), response.getSuperAdministrativeArea()); + break; + case "CITY": + event.getExtractedFields() + .put(entry.getValue(), response.getAdministrativeArea()); + break; + case "LONGITUDE": + event.getExtractedFields() + .put(entry.getValue(), response.getLongitude()); + break; + case "LATITUDE": + event.getExtractedFields() + .put(entry.getValue(), response.getLatitude()); + break; + case "ISP": + event.getExtractedFields() + .put(entry.getValue(), response.getIsp()); + break; + case "ORGANIZATION ": + event.getExtractedFields() + .put(entry.getValue(), response.getOrganization()); + break; + default: + break; + } + event.getExtractedFields() + .put( + udfContext.getOutput_fields().get(0), + IpKnowledgeBase.getVenderWithIpLookup() + .get(vender) + .infoLookup( + event.getExtractedFields() + .get( + udfContext + .getLookup_fields() + .get(0)) + .toString())); + break; + } } } return event; @@ -231,7 +273,22 @@ public class GeoIpLookup implements UDF { throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters need containkey geolocation_field_mapping"); } + else { + Map<String,String> geolocation_field_mapping =(Map<String, String>) udfContext.getParameters().get("geolocation_field_mapping"); + + if(!geolocation_field_mapping.isEmpty()){ + for (Map.Entry<String, String> entry : geolocation_field_mapping.entrySet()) { + + if(!entry.getKey().equals("COUNTRY") && !entry.getKey().equals("PROVINCE") && !entry.getKey().equals("CITY") && !entry.getKey().equals("LONGITUDE") && !entry.getKey().equals("LATITUDE") && !entry.getKey().equals("ISP") && !entry.getKey().equals("ORGANIZATION")){ + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct"); + } + } + } + else { + throw new GrootStreamRuntimeException(CommonErrorCode.ILLEGAL_ARGUMENT, "parameters geolocation_field_mapping value is not correct"); + } + } } } @@ -245,6 +302,10 @@ public class GeoIpLookup implements UDF { @Override public void close() { - + try { + shutdownScheduler(); + } catch (SchedulerException e) { + throw new RuntimeException(e); + } } } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java index 6af2741..059088d 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/PathCombine.java @@ -5,11 +5,12 @@ import com.geedgenetworks.common.Constants; import com.geedgenetworks.common.config.EngineConfig; import com.geedgenetworks.core.pojo.Event; import com.geedgenetworks.core.pojo.UDFContext; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import java.util.*; - +@Slf4j public class PathCombine implements UDF { private UDFContext udfContext; @@ -18,28 +19,36 @@ public class PathCombine implements UDF { private Map<String, String> pathParameters = new LinkedHashMap<>(); + + + @Override public void open(RuntimeContext runtimeContext, UDFContext udfContext) { this.udfContext = udfContext; -/* + Configuration configuration = (Configuration) runtimeContext .getExecutionConfig().getGlobalJobParameters(); EngineConfig engineConfig = JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class); -*/ - + Map<String,String> propertiesConfig =engineConfig.getPropertiesConfig(); if (udfContext.getParameters() != null && !udfContext.getParameters().isEmpty()) { Object object = udfContext.getParameters().get("path"); - ArrayList<Object> arrayList = new ArrayList<>(Arrays.asList(object)); + ArrayList<Object> arrayList = new ArrayList<>(Collections.singletonList(object)); for (Object key : arrayList) { String column =key.toString(); - if(column.contains("global")){ - pathParameters.put(column,"");//待定义全局变量 + if(column.startsWith("grootstream.properties")){ + String propertiesConfigKey = column.replaceAll("grootstream.properties.","").trim(); + if(propertiesConfig.containsKey(propertiesConfigKey)) { + pathParameters.put(column,propertiesConfig.get(propertiesConfigKey));//待定义全局变量 + } + else { + throw new RuntimeException("propertiesConfigKey "+propertiesConfigKey+" not found "); + } } else { - pathParameters.put(column,"dynamic"); + pathParameters.put(column,""); } } @@ -61,7 +70,7 @@ public class PathCombine implements UDF { public Event evaluate(Event event) { StringBuilder stringBuilder = new StringBuilder(); for (Map.Entry<String, String> entry : pathParameters.entrySet()) { - if (entry.getValue().equals("dynamic")) { + if (entry.getValue().isEmpty()) { stringBuilder.append(event.getExtractedFields().getOrDefault(entry.getKey(),"").toString()); } else { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java index c0ad7f1..626de27 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/UnixTimestampConverter.java @@ -74,7 +74,7 @@ public class UnixTimestampConverter implements UDF { @Override public String functionName() { - return "UNIX_TIMESTAMP_FUNCTION"; + return "UNIX_TIMESTAMP_CONVERTER"; } @Override diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java index 00b2b53..2c0137e 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AbstractKnowledgeBase.java @@ -1,5 +1,6 @@ package com.geedgenetworks.core.utils.KnowlegdeBase; +import com.alibaba.fastjson2.JSON; import com.geedgenetworks.common.config.KnowledgeConfig; import com.geedgenetworks.common.utils.HttpClientUtils; import com.geedgenetworks.core.pojo.KnowLedgeEntity; @@ -60,15 +61,23 @@ public abstract class AbstractKnowledgeBase { // HttpClientUtils httpClientUtils = new HttpClientUtils(); // String metadate = httpClientUtils.httpGet(URI.create("/v1/knowledge_base/"+ id +"/meta")) ; //临时写死 - KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity(); - knowLedgeEntity.setIsValid(1); - knowLedgeEntity.setSha256("c06db87e9a914a8296ca892880c353884ac15b831c485c29fad9889ffb4e0fa8"); - knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/4bd16ddb-d1cb-4310-b620-02164335c50d-aXBfYnVpbHRpbg==.mmdb"); - /* knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c"); - knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb");*/ - // List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {}); - // return knowLedgeEntityList.get(0); - return knowLedgeEntity; + KnowLedgeEntity knowLedgeEntity1 = new KnowLedgeEntity(); + knowLedgeEntity1.setIsValid(1); + knowLedgeEntity1.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c"); + knowLedgeEntity1.setPath("http://192.168.44.12:9098/hos/knowledge_base_bucket/84ec56db-8f3b-440a-af79-15148d7cd3c8-YXNuX2J1aWx0aW4=.mmdb"); + +/* + KnowLedgeEntity knowLedgeEntity2 = new KnowLedgeEntity(); + knowLedgeEntity2.setIsValid(1); + knowLedgeEntity2.setSha256("044228128a1cc7ae0f8d4bae9fdc73eac9dea3e074e7c15796d29167b89cfdee"); + knowLedgeEntity2.setPath("http://192.168.44.12:9098/hos/knowledge_base_bucket/63220f3e-7792-43a8-9921-d01da16b76b2-aXBfYnVpbHRpbg==.mmdb"); +*/ + + /* knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c"); + knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb"); + /* List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {}); + return knowLedgeEntityList.get(0);*/ + return knowLedgeEntity1; } catch (Exception e) { logger.error("get file Metadata " + id + " error: " + e.getMessage()); } diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java index a62acf3..12b29ce 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/AsnKnowledgeBase.java @@ -16,13 +16,13 @@ import java.util.concurrent.ConcurrentHashMap; public class AsnKnowledgeBase extends AbstractKnowledgeBase { - private static final Logger LOG = LoggerFactory.getLogger(AsnKnowledgeBase.class); @Getter private static Map<String, IpLookupV2> venderWithAsnLookup = new ConcurrentHashMap<>(); private static final Logger logger = LoggerFactory.getLogger(AsnKnowledgeBase.class); private static AsnKnowledgeBase instance; // 私有构造函数,防止外部实例化 + public static synchronized AsnKnowledgeBase getInstance() { if (instance == null) { instance = new AsnKnowledgeBase(); diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java deleted file mode 100644 index ccbf1a5..0000000 --- a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseSchedule.java +++ /dev/null @@ -1,163 +0,0 @@ -package com.geedgenetworks.core.utils.KnowlegdeBase; - -import com.geedgenetworks.common.config.CommonConfig; -import com.geedgenetworks.common.config.KnowledgeConfig; -import com.geedgenetworks.core.pojo.KnowLedgeEntity; -import lombok.Getter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; -import java.security.NoSuchAlgorithmException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - - - -public class KnowledgeBaseSchedule { - - private static Timer timer; - - public static Map<String, KnowledgeConfig> knowledgeConfigMap = new ConcurrentHashMap<>(); - public static Map<String, List<KnowLedgeEntity>> KnowLedgeEntityMap = new ConcurrentHashMap<>(); - @Getter - private static Map<String, String> knowledgeBaseClassReflect= new ConcurrentHashMap<>() ; - private static final Logger logger = LoggerFactory.getLogger(KnowledgeBaseSchedule.class); - - private static KnowledgeBaseSchedule instance; - - public static synchronized KnowledgeBaseSchedule getInstance() { - if (instance == null) { - instance = new KnowledgeBaseSchedule(); - knowledgeBaseClassReflect =CommonConfig.KNOWLEDGE_BASE_CLASS_REFLECT; - ScheduTask.startTask(); - } - return instance; - } - - public static void updateKnowledgeBase(String name) { - - try { - // 获取私有构造方法 - Class<?> cls = Class.forName(knowledgeBaseClassReflect.get(knowledgeConfigMap.get(name).getType())); - Constructor<?> constructor = cls.getDeclaredConstructor(); - constructor.setAccessible(true); - // knowledgeUtil instance = (knowledgeUtil) constructor.newInstance(); - - Method createInstanceMethod = cls.getMethod("getInstance"); - AbstractKnowledgeBase instance = (AbstractKnowledgeBase) createInstanceMethod.invoke(null); - // 使用实例调用方法 - instance.updateKnowledgeBase(knowledgeConfigMap.get(name)); - } catch (Exception e) { - logger.error(e.getMessage()); - } - } - - - - public static synchronized void initKnowledgeBase(KnowledgeConfig KnowledgeConfig) { - - if (instance == null) { - getInstance(); - } - if (!knowledgeConfigMap.containsKey(KnowledgeConfig.getName())) { - List<KnowLedgeEntity> knowLedgeEntityList = new ArrayList<>(); - try { - knowledgeConfigMap.put(KnowledgeConfig.getName(), KnowledgeConfig); - updateKnowledgeBase(KnowledgeConfig.getName()); - for (String id : KnowledgeConfig.getFiles()) { - KnowLedgeEntity knowLedgeEntity = getMetadata(id); - knowLedgeEntityList.add(knowLedgeEntity); - } - KnowLedgeEntityMap.put(KnowledgeConfig.getName(), knowLedgeEntityList); - } catch (Exception e) { - logger.error("initKnowledgeBase " + KnowledgeConfig.getName() + " error: " + e.getMessage()); - } - KnowLedgeEntityMap.put(KnowledgeConfig.getName(), knowLedgeEntityList); - } - - - } - private static KnowLedgeEntity getMetadata(String id) { - - try{ -/* 暂时写死 - HttpClientUtils httpClientUtils = new HttpClientUtils(); - String metadate = httpClientUtils.httpGet(URI.create("/v1/knowledge_base/"+ id +"/meta")) ; - List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {}); - return knowLedgeEntityList.get(0); -*/ - KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity(); - knowLedgeEntity.setIsValid(1); - knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c"); - knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb"); - return knowLedgeEntity; - } catch (Exception e) { - logger.error("get file Metadata " + id + " error: " + e.getMessage()); - } - - return null; - } - - - public static synchronized Boolean checkUpdateMesssage (String name) throws NoSuchAlgorithmException { - - //暂时写死Boolean changeFlag = false; - Boolean changeFlag = true; - if(!KnowLedgeEntityMap.get(name).isEmpty()) { - - for (KnowLedgeEntity knowLedgeEntity : KnowLedgeEntityMap.get(name)) { - - if (!knowLedgeEntity.getSha256().equals(getMetadata(knowLedgeEntity.getId()).getSha256())) { - return true; - - } - } - } - else { - changeFlag = true; - } - return changeFlag; - - } - - - - - - - static class ScheduTask extends TimerTask { - - - @Override - public void run() { - for (Map.Entry<String, KnowledgeConfig> entry : knowledgeConfigMap.entrySet()) { - Boolean flag = null; - try { - flag = checkUpdateMesssage(entry.getValue().getName()); - if(flag){ - updateKnowledgeBase(entry.getKey()); - } - } catch (NoSuchAlgorithmException e) { - logger.error(e.getMessage()); - } catch (Exception e) { - logger.error(e.getMessage()); - } - } - } - - public static void startTask() { - timer = new Timer(); - timer.schedule(new ScheduTask(), 0, 300000); // 每隔300秒执行一次任务 - } - - public static void stopTask() { - if (timer != null) { - timer.cancel(); - timer = null; - } - } - } - -} diff --git a/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java new file mode 100644 index 0000000..98b48a2 --- /dev/null +++ b/groot-core/src/main/java/com/geedgenetworks/core/utils/KnowlegdeBase/KnowledgeBaseUpdateJob.java @@ -0,0 +1,191 @@ +package com.geedgenetworks.core.utils.KnowlegdeBase; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; +import com.geedgenetworks.common.Constants; +import com.geedgenetworks.common.config.CommonConfig; +import com.geedgenetworks.common.config.EngineConfig; +import com.geedgenetworks.common.config.KnowledgeConfig; +import com.geedgenetworks.common.utils.HttpClientUtils; +import com.geedgenetworks.core.pojo.KnowLedgeEntity; +import com.geedgenetworks.core.pojo.KnowledgeBaseEntity; +import lombok.Getter; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.net.URI; +import java.security.NoSuchAlgorithmException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static com.geedgenetworks.core.utils.KnowlegdeBase.KnowledgeBaseScheduler.startTask; + + +public class KnowledgeBaseUpdateJob implements Job { + + +/* + public static Map<String, KnowledgeConfig> knowledgeConfigMap = new ConcurrentHashMap<>(); + public static Map<String, List<KnowLedgeEntity>> KnowLedgeEntityMap = new ConcurrentHashMap<>(); + +*/ + + @Getter + private static Map<String, KnowledgeBaseEntity> KnowLedgeBaseEntityMap= new ConcurrentHashMap<>() ; + private static final Logger logger = LoggerFactory.getLogger(KnowledgeBaseUpdateJob.class); + + private static KnowledgeBaseUpdateJob instance; + + private static EngineConfig engineConfig; + + public static synchronized KnowledgeBaseUpdateJob getInstance(RuntimeContext runtimeContext) { + if (instance == null) { + instance = new KnowledgeBaseUpdateJob(); + // knowledgeBaseClassReflect =CommonConfig.KNOWLEDGE_BASE_CLASS_REFLECT; + Configuration configuration = (Configuration) runtimeContext + .getExecutionConfig().getGlobalJobParameters(); + engineConfig = com.alibaba.fastjson.JSON.parseObject(configuration.toMap().get(Constants.SYSPROP_GROOTSTREAM_CONFIG), EngineConfig.class); + startTask(); + } + return instance; + } + + public static void updateKnowledgeBase(String name) { + + try { + // 获取私有构造方法 +/* Class<?> cls = Class.forName(knowledgeBaseClassReflect.get(knowledgeConfigMap.get(name).getType())); + Constructor<?> constructor = cls.getDeclaredConstructor(); + constructor.setAccessible(true); + Method createInstanceMethod = cls.getMethod("getInstance"); + AbstractKnowledgeBase instance = (AbstractKnowledgeBase) createInstanceMethod.invoke(null); + // 使用实例调用方法 + instance.updateKnowledgeBase(knowledgeConfigMap.get(name));*/ + + + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + + + public static synchronized void initKnowledgeBase(String name,AbstractKnowledgeBase abstractKnowledgeBase,RuntimeContext runtimeContext) { + + if (instance == null) { + getInstance(runtimeContext); + } + + if (!KnowLedgeBaseEntityMap.containsKey(name)) { + List<KnowLedgeEntity> knowLedgeEntityList = new ArrayList<>(); + KnowledgeBaseEntity knowledgeBaseEntity = new KnowledgeBaseEntity(); + knowledgeBaseEntity.setAbstractKnowledgeBase(abstractKnowledgeBase); + knowledgeBaseEntity.setKnowLedgeEntityList(knowLedgeEntityList); + + try { + for(KnowledgeConfig knowledgeConfig : engineConfig.getKnowledgeBaseConfig()){ + if(name.equals(knowledgeConfig.getName())){ + knowledgeBaseEntity.setKnowledgeConfig(knowledgeConfig); + break; + } + } + KnowLedgeBaseEntityMap.put(name, knowledgeBaseEntity); + for (String id : knowledgeBaseEntity.getKnowledgeConfig().getFiles()) { + // KnowLedgeEntity knowLedgeEntity = getMetadata(id); + KnowLedgeEntity knowLedgeEntity1 = new KnowLedgeEntity(); + knowLedgeEntity1.setIsValid(1); + knowLedgeEntity1.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c"); + knowLedgeEntity1.setPath("http://192.168.44.12:9098/hos/knowledge_base_bucket/84ec56db-8f3b-440a-af79-15148d7cd3c8-YXNuX2J1aWx0aW4=.mmdb"); + +/* + KnowLedgeEntity knowLedgeEntity2 = new KnowLedgeEntity(); + knowLedgeEntity2.setIsValid(1); + knowLedgeEntity2.setSha256("044228128a1cc7ae0f8d4bae9fdc73eac9dea3e074e7c15796d29167b89cfdee"); + knowLedgeEntity2.setPath("http://192.168.44.12:9098/hos/knowledge_base_bucket/63220f3e-7792-43a8-9921-d01da16b76b2-aXBfYnVpbHRpbg==.mmdb"); +*/ + + knowLedgeEntityList.add(knowLedgeEntity1); + } + + + updateKnowledgeBase(name); + } catch (Exception e) { + logger.error("initKnowledgeBase " + e.getMessage()); + } + } + + + } + private static KnowLedgeEntity getMetadata(String id) { + + try{ + HttpClientUtils httpClientUtils = new HttpClientUtils(); + String metadate = httpClientUtils.httpGet(URI.create("/v1/knowledge_base/"+ id +"/meta")) ; + List<KnowLedgeEntity> knowLedgeEntityList = JSON.parseObject(metadate, new TypeReference<List<KnowLedgeEntity>>() {}); + return knowLedgeEntityList.get(0); + + /* KnowLedgeEntity knowLedgeEntity = new KnowLedgeEntity(); + knowLedgeEntity.setIsValid(1); + knowLedgeEntity.setSha256("2a2defd9c96ea5434ac628d3dc4fc6790e30d84c218b02fe798dbc6772dbf76c"); + knowLedgeEntity.setPath("http://192.168.44.12:9098/hos/knowledge_base_hos_bucket/1e8aabe7-2170-445c-9124-6051332ca7d6-YXNuX2J1aWx0aW4=.mmdb"); + return knowLedgeEntity;*/ + } catch (Exception e) { + logger.error("get file Metadata " + id + " error: " + e.getMessage()); + } + + return null; + } + + + public static synchronized Boolean checkUpdateMesssage (String name) { + + System.out.println("check +++++++++++++++++++++++++++++++++++++"); + /* Boolean changeFlag = false; + if(!KnowLedgeEntityMap.get(name).isEmpty()) { + + for (KnowLedgeEntity knowLedgeEntity : KnowLedgeEntityMap.get(name)) { + + KnowLedgeEntity metadata =getMetadata(knowLedgeEntity.getId()); + if(metadata!=null) { + if (!knowLedgeEntity.getSha256().equals(metadata.getSha256())) { + return true; + } + } + else { + logger.error("get file Metadata " + knowLedgeEntity.getId() + " error " ); + } + } + } + else { + changeFlag = true; + }*/ + return true; + + } + + @Override + public void execute(JobExecutionContext jobExecutionContext) { + for (Map.Entry<String, KnowledgeBaseEntity> entry : KnowLedgeBaseEntityMap.entrySet()) { + Boolean flag = null; + try { + flag = checkUpdateMesssage(entry.getKey()); + if(flag){ + updateKnowledgeBase(entry.getKey()); + } + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + } + + + + +} diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java index ab23331..a2afad9 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/test/GeoIpLookupFunctionTest.java @@ -5,6 +5,7 @@ import com.geedgenetworks.common.exception.GrootStreamRuntimeException; import com.geedgenetworks.core.pojo.UDFContext; import com.geedgenetworks.core.udf.GeoIpLookup; import com.geedgenetworks.core.utils.KnowlegdeBase.IpKnowledgeBase; +import com.geedgenetworks.domain.LocationResponse; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -82,7 +83,7 @@ public class GeoIpLookupFunctionTest { String latLngLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").latLngLookup("2600:1015:b002::"); String ispLookup = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").ispLookup("2600:1015:b002::"); String infoLookupToJSONString = IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookupToJSONString("2600:1015:b002::"); - //String infoLookup = (String) IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookup("2600:1015:b002::"); + LocationResponse infoLookup = (LocationResponse) IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookup("2600:1015:b002::"); System.out.println("countryLookup:" + countryLookup); @@ -94,7 +95,7 @@ public class GeoIpLookupFunctionTest { System.out.println("ispLookup:" + ispLookup); System.out.println("infoLookupToJSONString:" + infoLookupToJSONString); System.out.println("infoLookup:" + IpKnowledgeBase.getVenderWithIpLookup().get("tsg_geoiplookup").infoLookup("2600:1015:b002::")); - System.out.println("----------------------------"); + System.out.println(infoLookup+"----------------------------"); // assertEquals("6167", asn); @@ -64,8 +64,7 @@ <lombok.version>1.18.24</lombok.version> <config.version>1.3.3</config.version> <hazelcast.version>5.1</hazelcast.version> - - + <quartz.version>2.3.2</quartz.version> </properties> <dependencyManagement> @@ -378,7 +377,11 @@ </exclusions> </dependency> - + <dependency> + <groupId>org.quartz-scheduler</groupId> + <artifactId>quartz</artifactId> + <version>${quartz.version}</version> + </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> |
