From 4992788387968d6763d90d06ee9f06e5faa58915 Mon Sep 17 00:00:00 2001 From: gujinkai Date: Mon, 29 Apr 2024 09:54:07 +0800 Subject: [CN-1650] fix: the range of tag is larger than expected at special env (cherry picked from commit 202d67e007b23db421f6e5ba077248f27208220f) --- .../IntelligenceIndicatorKnowledgeBaseHandler.java | 5 +- .../IpTagUserDefineKnowledgeBaseHandler.java | 5 +- .../udf/cn/IntelligenceIndicatorLookupTest.java | 21 ++- .../example/cn_grootstream_job_template.yaml | 198 ++++++++++++--------- 4 files changed, 140 insertions(+), 89 deletions(-) diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java index 716f72f..94fdae1 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java @@ -106,8 +106,9 @@ public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKno List currentTags = new ArrayList<>(tags); subRangeMap.put(Range.closed(startIpAddress, endIpAddress), currentTags); rangeListMap.forEach((ipAddressRange, ipAddressRangeTags) -> { - ipAddressRangeTags.addAll(tags); - subRangeMap.put(ipAddressRange, ipAddressRangeTags); + List newTags = new ArrayList<>(ipAddressRangeTags); + newTags.addAll(tags); + subRangeMap.put(ipAddressRange, newTags); }); newIpTagMap.putAll(subRangeMap); } else if ("Domain".equals(type)) { diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IpTagUserDefineKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IpTagUserDefineKnowledgeBaseHandler.java index 3b2f1e3..c33e1e0 100644 --- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IpTagUserDefineKnowledgeBaseHandler.java +++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IpTagUserDefineKnowledgeBaseHandler.java @@ -100,8 +100,9 @@ public class IpTagUserDefineKnowledgeBaseHandler extends AbstractMultipleKnowled nodes.add(node); subRangeMap.put(Range.closed(startIpAddress, endIpAddress), nodes); rangeListMap.forEach((ipAddressRange, ipAddressNode) -> { - ipAddressNode.add(new Node(tagValue, id)); - subRangeMap.put(ipAddressRange, ipAddressNode); + List newNodeList = new ArrayList<>(ipAddressNode); + newNodeList.add(new Node(tagValue, id)); + subRangeMap.put(ipAddressRange, newNodeList); }); treeRangeMap.putAll(subRangeMap); } catch (Exception lineException) { diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java index 804c7ca..34fef6b 100644 --- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java +++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java @@ -27,7 +27,7 @@ public class IntelligenceIndicatorLookupTest { void setUp() { runtimeContext = mockRuntimeContext(); - String content = "type,ip_addr_format,ip1,ip2,domain,tags\nIP,CIDR,116.178.65.0,25,ali.com,\"阿里1,云服务1\"\nDomain,CIDR,116.178.65.0,25,$ali.com,\"阿里2,云服务2\"\nDomain,CIDR,116.178.65.0,25,*baidu.com,\"阿里3,云服务3\""; + String content = "type,ip_addr_format,ip1,ip2,domain,tags\nIP,CIDR,116.178.65.0,25,ali.com,\"阿里1,云服务1\"\nDomain,CIDR,116.178.65.0,25,$ali.com,\"阿里2,云服务2\"\nDomain,CIDR,116.178.65.0,25,*baidu.com,\"阿里3,云服务3\"\nIP,Single,116.178.65.64,116.178.65.64,ali.com,\"test\""; mockKnowledgeBaseHandler(content); intelligenceIndicatorLookup = new IntelligenceIndicatorLookup(); @@ -136,6 +136,25 @@ public class IntelligenceIndicatorLookupTest { assertEquals(Arrays.asList("阿里3", "云服务3"), evaluate.getExtractedFields().get("domain_tags")); } + @Test + void evaluate6() { + UDFContext udfContext = new UDFContext(); + Map parameters = new HashMap<>(); + parameters.put("kb_name", kbName); + parameters.put("option", "IP_TO_TAG"); + udfContext.setParameters(parameters); + udfContext.setLookup_fields(Collections.singletonList("server_ip")); + udfContext.setOutput_fields(Collections.singletonList("server_ip_tags")); + intelligenceIndicatorLookup.open(runtimeContext, udfContext); + + Event event = new Event(); + Map fields = new HashMap<>(); + fields.put("server_ip", "116.178.65.64"); + event.setExtractedFields(fields); + Event evaluate1 = intelligenceIndicatorLookup.evaluate(event); + assertEquals(Arrays.asList("阿里1", "云服务1", "test"), evaluate1.getExtractedFields().get("server_ip_tags")); + } + @AfterEach void afterAll() { clearState(); diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml index 1e4224f..251a0ee 100644 --- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml +++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml @@ -3,25 +3,17 @@ sources: type: kafka # fields: # [array of object] Field List, if not set, all fields(Map) will be output. properties: # [object] Source Properties - topic: SESSION-RECORD - kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094 + topic: SESSION-RECORD-PROCESSED + kafka.bootstrap.servers: { { tsg_olap_kafka_servers } } kafka.session.timeout.ms: 60000 kafka.max.poll.records: 3000 kafka.max.partition.fetch.bytes: 31457280 kafka.security.protocol: SASL_PLAINTEXT - kafka.ssl.keystore.location: - kafka.ssl.keystore.password: - kafka.ssl.truststore.location: - kafka.ssl.truststore.password: - kafka.ssl.key.password: kafka.sasl.mechanism: PLAIN - kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; - kafka.buffer.memory: - kafka.group.id: 44.55-test + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 + kafka.group.id: etl_processed_session_record_kafka_to_cn_kafka kafka.auto.offset.reset: latest - kafka.max.request.size: - kafka.compression.type: none - format: json # [string] Data Format, default is json + format: json processing_pipelines: session_record_processor: # [object] Processing Pipeline @@ -29,19 +21,6 @@ processing_pipelines: remove_fields: output_fields: functions: # [array of object] Function List - - function: SNOWFLAKE_ID - lookup_fields: [ '' ] - output_fields: [ log_id ] - filter: - parameters: - data_center_id_num: 1 - - - function: UNIX_TIMESTAMP_CONVERTER - lookup_fields: [ __timestamp ] - output_fields: [ recv_time ] - parameters: - precision: seconds - - function: EVAL output_fields: [ domain ] parameters: @@ -187,19 +166,100 @@ processing_pipelines: kb_name: none #kb_name: cn_internal_ip - - function: CN_VPN_LOOKUP - lookup_fields: [ server_ip ] - output_fields: [ server_vpn_service_name ] + - function: EVAL + output_fields: [ sent_bytes ] parameters: - kb_name: cn_vpn_learning_ip - option: IP_TO_VPN + value_expression: "sent_bytes == null ? 0 : sent_bytes" - - function: CN_VPN_LOOKUP - lookup_fields: [ domain ] - output_fields: [ domain_vpn_service_name ] + - function: EVAL + output_fields: [ sent_pkts ] parameters: - kb_name: cn_vpn_learning_domain - option: DOMAIN_TO_VPN + value_expression: "sent_pkts == null ? 0 : sent_pkts" + + - function: EVAL + output_fields: [ received_bytes ] + parameters: + value_expression: "received_bytes == null ? 0 : received_bytes" + + - function: EVAL + output_fields: [ received_pkts ] + parameters: + value_expression: "received_pkts == null ? 0 : received_pkts" + + - function: EVAL + output_fields: [ traffic_inbound_byte ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? received_bytes : traffic_inbound_byte" + + - function: EVAL + output_fields: [ traffic_outbound_byte ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? received_bytes : traffic_outbound_byte" + + - function: EVAL + output_fields: [ traffic_inbound_pkt ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? received_pkts : traffic_inbound_pkt" + + - function: EVAL + output_fields: [ traffic_outbound_pkt ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? received_pkts : traffic_outbound_pkt" + + - function: EVAL + output_fields: [ traffic_outbound_byte ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? sent_bytes : traffic_outbound_byte" + + - function: EVAL + output_fields: [ traffic_inbound_byte ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? sent_bytes : traffic_inbound_byte" + + - function: EVAL + output_fields: [ traffic_outbound_pkt ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'external' ? sent_pkts : traffic_outbound_pkt" + + - function: EVAL + output_fields: [ traffic_inbound_pkt ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'internal' ? sent_pkts : traffic_inbound_pkt" + + - function: EVAL + output_fields: [ traffic_internal_byte ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'internal' ? sent_bytes + received_bytes : traffic_internal_byte" + + - function: EVAL + output_fields: [ traffic_internal_pkt ] + parameters: + value_expression: "client_zone == 'internal' && server_zone == 'internal' ? sent_pkts + received_pkts : traffic_internal_pkt" + + - function: EVAL + output_fields: [ traffic_through_byte ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'external' ? sent_bytes + received_bytes : traffic_through_byte" + + - function: EVAL + output_fields: [ traffic_through_pkt ] + parameters: + value_expression: "client_zone == 'external' && server_zone == 'external' ? sent_pkts + received_pkts : traffic_through_pkt" + + - function: EVAL + output_fields: [ sessions ] + parameters: + value_expression: "1" + + - function: EVAL + output_fields: [ internal_query_num ] + parameters: + value_expression: "client_zone == 'internal' ? sessions : internal_query_num" + + - function: EVAL + output_fields: [ external_query_num ] + parameters: + value_expression: "client_zone == 'external' ? sessions : external_query_num" - function: CN_ANONYMITY_LOOKUP lookup_fields: [ server_ip ] @@ -255,48 +315,24 @@ processing_pipelines: output_fields: [ client_ip_tags ] - function: GENERATE_STRING_ARRAY - lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ] + lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_ip_tags ] output_fields: [ server_ip_tags ] - function: GENERATE_STRING_ARRAY - lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ] - output_fields: [ domain_tags ] - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ client_ip_tags ] - output_fields: [ client_ip_tags ] - parameters: - prefix: ip. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ server_ip_tags ] - output_fields: [ server_ip_tags ] - parameters: - prefix: ip. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ domain_tags ] + lookup_fields: [ domain_node_type,domain_malware,domain_tags ] output_fields: [ domain_tags ] - parameters: - prefix: domain. - - - function: CN_ARRAY_ELEMENTS_PREPEND - lookup_fields: [ app_tags ] - output_fields: [ app_tags ] - parameters: - prefix: app. postprocessing_pipelines: remove_field_processor: # [object] Processing Pipeline type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl - output_fields: [ recv_time,log_id,flags,start_timestamp_ms,end_timestamp_ms,duration_ms,decoded_as,client_ip,server_ip,client_port,server_port,app,app_transition,decoded_path,ip_protocol,l7_protocol,out_link_id,in_link_id,subscriber_id,imei,imsi,phone_number,apn,http_url,dns_rcode,dns_qname,dns_qtype,dns_rr,out_link_direction,in_link_direction,server_fqdn,server_domain,domain,domain_sld,domain_category_name,domain_category_group,domain_reputation_level,domain_icp_company_name,domain_whois_org,domain_tags,client_zone,client_country_region,client_super_admin_area,client_admin_area,client_longitude,client_latitude,client_isp,client_asn,client_ip_tags,server_zone,server_country_region,server_super_admin_area,server_admin_area,server_longitude,server_latitude,server_isp,server_asn,server_ip_tags,app_category,app_subcategory,app_company,app_company_category,app_tags,sent_pkts,sent_bytes,received_pkts,received_bytes,sessions,tcp_c2s_lost_bytes,tcp_s2c_lost_bytes,tcp_c2s_o3_pkts,tcp_s2c_o3_pkts,tcp_c2s_rtx_bytes,tcp_s2c_rtx_bytes,tcp_c2s_rtx_pkts,tcp_s2c_rtx_pkts,tcp_rtt_ms,http_response_latency_ms,ssl_handshake_latency_ms,dns_response_latency_ms,cn_internal_rule_id_list,cn_internal_ioc_type_list ] + output_fields: [ recv_time,log_id,flags,start_timestamp_ms,end_timestamp_ms,duration_ms,decoded_as,client_ip,server_ip,client_port,server_port,app,app_transition,decoded_path,ip_protocol,l7_protocol,out_link_id,in_link_id,subscriber_id,imei,imsi,phone_number,apn,http_url,dns_rcode,dns_qname,dns_qtype,dns_rr,out_link_direction,in_link_direction,server_fqdn,server_domain,domain,domain_sld,domain_category_name,domain_category_group,domain_reputation_level,domain_icp_company_name,domain_whois_org,domain_tags,client_zone,client_country_region,client_super_admin_area,client_admin_area,client_longitude,client_latitude,client_isp,client_asn,client_ip_tags,server_zone,server_country_region,server_super_admin_area,server_admin_area,server_longitude,server_latitude,server_isp,server_asn,server_ip_tags,app_category,app_subcategory,app_company,app_company_category,app_tags,sent_pkts,sent_bytes,received_pkts,received_bytes,sessions,tcp_c2s_lost_bytes,tcp_s2c_lost_bytes,tcp_c2s_o3_pkts,tcp_s2c_o3_pkts,tcp_c2s_rtx_bytes,tcp_s2c_rtx_bytes,tcp_c2s_rtx_pkts,tcp_s2c_rtx_pkts,tcp_rtt_ms,http_response_latency_ms,ssl_handshake_latency_ms,dns_response_latency_ms,cn_internal_rule_id_list,cn_internal_ioc_type_list,traffic_inbound_byte,traffic_inbound_pkt,traffic_outbound_byte,traffic_outbound_pkt,traffic_internal_byte,traffic_internal_pkt,traffic_through_byte,traffic_through_pkt,internal_query_num,external_query_num ] sinks: - kafka_sink_a: + cn_kafka_sink: type: kafka properties: topic: SESSION-RECORD-CN - kafka.bootstrap.servers: 192.168.44.55:9092 + kafka.bootstrap.servers: { { kafka_sink_servers } } kafka.retries: 0 kafka.linger.ms: 10 kafka.request.timeout.ms: 30000 @@ -304,29 +340,23 @@ sinks: kafka.buffer.memory: 134217728 kafka.max.request.size: 10485760 kafka.compression.type: snappy - kafka.security.protocol: - kafka.ssl.keystore.location: - kafka.ssl.keystore.password: - kafka.ssl.truststore.location: - kafka.ssl.truststore.password: - kafka.ssl.key.password: - kafka.sasl.mechanism: - kafka.sasl.jaas.config: + kafka.security.protocol: SASL_PLAINTEXT + kafka.sasl.mechanism: PLAIN + kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817 format: json -application: # [object] Application Configuration - env: # [object] Environment Variables - name: groot-stream-job # [string] Job Name - parallelism: 3 # [number] Job-Level Parallelism +application: + env: + name: etl_session_record_processed_kafka_to_cn_kafka + shade.identifier: aes pipeline: - object-reuse: true # [boolean] Object Reuse, default is false - topology: # [array of object] Node List. It will be used build data flow for job dag graph. - - name: kafka_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE. - #parallelism: 1 # [number] Operator-Level Parallelism. - downstream: [ session_record_processor ] # [array of string] Downstream Node Name List. + object-reuse: true + topology: + - name: kafka_source + downstream: [ session_record_processor ] - name: session_record_processor downstream: [ remove_field_processor ] - name: remove_field_processor - downstream: [ kafka_sink_a ] - - name: kafka_sink_a + downstream: [ cn_kafka_sink ] + - name: cn_kafka_sink downstream: [ ] -- cgit v1.2.3