summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-04-29 09:54:07 +0800
committergujinkai <[email protected]>2024-11-08 14:41:56 +0800
commit4992788387968d6763d90d06ee9f06e5faa58915 (patch)
treeed280576df7f6d4518a6fef464fe90f18856f578
parent110feff530f9940336c5378a9e0fe8ad6bd83ea4 (diff)
[CN-1650] fix: the range of tag is larger than expected at special env
(cherry picked from commit 202d67e007b23db421f6e5ba077248f27208220f)
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java5
-rw-r--r--groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IpTagUserDefineKnowledgeBaseHandler.java5
-rw-r--r--groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java21
-rw-r--r--groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml198
4 files changed, 140 insertions, 89 deletions
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java
index 716f72f..94fdae1 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IntelligenceIndicatorKnowledgeBaseHandler.java
@@ -106,8 +106,9 @@ public class IntelligenceIndicatorKnowledgeBaseHandler extends AbstractSingleKno
List<String> currentTags = new ArrayList<>(tags);
subRangeMap.put(Range.closed(startIpAddress, endIpAddress), currentTags);
rangeListMap.forEach((ipAddressRange, ipAddressRangeTags) -> {
- ipAddressRangeTags.addAll(tags);
- subRangeMap.put(ipAddressRange, ipAddressRangeTags);
+ List<String> newTags = new ArrayList<>(ipAddressRangeTags);
+ newTags.addAll(tags);
+ subRangeMap.put(ipAddressRange, newTags);
});
newIpTagMap.putAll(subRangeMap);
} else if ("Domain".equals(type)) {
diff --git a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IpTagUserDefineKnowledgeBaseHandler.java b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IpTagUserDefineKnowledgeBaseHandler.java
index 3b2f1e3..c33e1e0 100644
--- a/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IpTagUserDefineKnowledgeBaseHandler.java
+++ b/groot-core/src/main/java/com/geedgenetworks/core/udf/knowlegdebase/handler/IpTagUserDefineKnowledgeBaseHandler.java
@@ -100,8 +100,9 @@ public class IpTagUserDefineKnowledgeBaseHandler extends AbstractMultipleKnowled
nodes.add(node);
subRangeMap.put(Range.closed(startIpAddress, endIpAddress), nodes);
rangeListMap.forEach((ipAddressRange, ipAddressNode) -> {
- ipAddressNode.add(new Node(tagValue, id));
- subRangeMap.put(ipAddressRange, ipAddressNode);
+ List<Node> newNodeList = new ArrayList<>(ipAddressNode);
+ newNodeList.add(new Node(tagValue, id));
+ subRangeMap.put(ipAddressRange, newNodeList);
});
treeRangeMap.putAll(subRangeMap);
} catch (Exception lineException) {
diff --git a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java
index 804c7ca..34fef6b 100644
--- a/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java
+++ b/groot-core/src/test/java/com/geedgenetworks/core/udf/cn/IntelligenceIndicatorLookupTest.java
@@ -27,7 +27,7 @@ public class IntelligenceIndicatorLookupTest {
void setUp() {
runtimeContext = mockRuntimeContext();
- String content = "type,ip_addr_format,ip1,ip2,domain,tags\nIP,CIDR,116.178.65.0,25,ali.com,\"阿里1,云服务1\"\nDomain,CIDR,116.178.65.0,25,$ali.com,\"阿里2,云服务2\"\nDomain,CIDR,116.178.65.0,25,*baidu.com,\"阿里3,云服务3\"";
+ String content = "type,ip_addr_format,ip1,ip2,domain,tags\nIP,CIDR,116.178.65.0,25,ali.com,\"阿里1,云服务1\"\nDomain,CIDR,116.178.65.0,25,$ali.com,\"阿里2,云服务2\"\nDomain,CIDR,116.178.65.0,25,*baidu.com,\"阿里3,云服务3\"\nIP,Single,116.178.65.64,116.178.65.64,ali.com,\"test\"";
mockKnowledgeBaseHandler(content);
intelligenceIndicatorLookup = new IntelligenceIndicatorLookup();
@@ -136,6 +136,25 @@ public class IntelligenceIndicatorLookupTest {
assertEquals(Arrays.asList("阿里3", "云服务3"), evaluate.getExtractedFields().get("domain_tags"));
}
+ @Test
+ void evaluate6() {
+ UDFContext udfContext = new UDFContext();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("kb_name", kbName);
+ parameters.put("option", "IP_TO_TAG");
+ udfContext.setParameters(parameters);
+ udfContext.setLookup_fields(Collections.singletonList("server_ip"));
+ udfContext.setOutput_fields(Collections.singletonList("server_ip_tags"));
+ intelligenceIndicatorLookup.open(runtimeContext, udfContext);
+
+ Event event = new Event();
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("server_ip", "116.178.65.64");
+ event.setExtractedFields(fields);
+ Event evaluate1 = intelligenceIndicatorLookup.evaluate(event);
+ assertEquals(Arrays.asList("阿里1", "云服务1", "test"), evaluate1.getExtractedFields().get("server_ip_tags"));
+ }
+
@AfterEach
void afterAll() {
clearState();
diff --git a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
index 1e4224f..251a0ee 100644
--- a/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
+++ b/groot-examples/cn-udf-example/src/main/resources/example/cn_grootstream_job_template.yaml
@@ -3,25 +3,17 @@ sources:
type: kafka
# fields: # [array of object] Field List, if not set, all fields(Map<String, Object>) will be output.
properties: # [object] Source Properties
- topic: SESSION-RECORD
- kafka.bootstrap.servers: 192.168.44.11:9094,192.168.44.13:9094,192.168.44.14:9094,192.168.44.15:9094,192.168.44.16:9094
+ topic: SESSION-RECORD-PROCESSED
+ kafka.bootstrap.servers: { { tsg_olap_kafka_servers } }
kafka.session.timeout.ms: 60000
kafka.max.poll.records: 3000
kafka.max.partition.fetch.bytes: 31457280
kafka.security.protocol: SASL_PLAINTEXT
- kafka.ssl.keystore.location:
- kafka.ssl.keystore.password:
- kafka.ssl.truststore.location:
- kafka.ssl.truststore.password:
- kafka.ssl.key.password:
kafka.sasl.mechanism: PLAIN
- kafka.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
- kafka.buffer.memory:
- kafka.group.id: 44.55-test
+ kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
+ kafka.group.id: etl_processed_session_record_kafka_to_cn_kafka
kafka.auto.offset.reset: latest
- kafka.max.request.size:
- kafka.compression.type: none
- format: json # [string] Data Format, default is json
+ format: json
processing_pipelines:
session_record_processor: # [object] Processing Pipeline
@@ -29,19 +21,6 @@ processing_pipelines:
remove_fields:
output_fields:
functions: # [array of object] Function List
- - function: SNOWFLAKE_ID
- lookup_fields: [ '' ]
- output_fields: [ log_id ]
- filter:
- parameters:
- data_center_id_num: 1
-
- - function: UNIX_TIMESTAMP_CONVERTER
- lookup_fields: [ __timestamp ]
- output_fields: [ recv_time ]
- parameters:
- precision: seconds
-
- function: EVAL
output_fields: [ domain ]
parameters:
@@ -187,19 +166,100 @@ processing_pipelines:
kb_name: none
#kb_name: cn_internal_ip
- - function: CN_VPN_LOOKUP
- lookup_fields: [ server_ip ]
- output_fields: [ server_vpn_service_name ]
+ - function: EVAL
+ output_fields: [ sent_bytes ]
parameters:
- kb_name: cn_vpn_learning_ip
- option: IP_TO_VPN
+ value_expression: "sent_bytes == null ? 0 : sent_bytes"
- - function: CN_VPN_LOOKUP
- lookup_fields: [ domain ]
- output_fields: [ domain_vpn_service_name ]
+ - function: EVAL
+ output_fields: [ sent_pkts ]
parameters:
- kb_name: cn_vpn_learning_domain
- option: DOMAIN_TO_VPN
+ value_expression: "sent_pkts == null ? 0 : sent_pkts"
+
+ - function: EVAL
+ output_fields: [ received_bytes ]
+ parameters:
+ value_expression: "received_bytes == null ? 0 : received_bytes"
+
+ - function: EVAL
+ output_fields: [ received_pkts ]
+ parameters:
+ value_expression: "received_pkts == null ? 0 : received_pkts"
+
+ - function: EVAL
+ output_fields: [ traffic_inbound_byte ]
+ parameters:
+ value_expression: "client_zone == 'internal' && server_zone == 'external' ? received_bytes : traffic_inbound_byte"
+
+ - function: EVAL
+ output_fields: [ traffic_outbound_byte ]
+ parameters:
+ value_expression: "client_zone == 'external' && server_zone == 'internal' ? received_bytes : traffic_outbound_byte"
+
+ - function: EVAL
+ output_fields: [ traffic_inbound_pkt ]
+ parameters:
+ value_expression: "client_zone == 'internal' && server_zone == 'external' ? received_pkts : traffic_inbound_pkt"
+
+ - function: EVAL
+ output_fields: [ traffic_outbound_pkt ]
+ parameters:
+ value_expression: "client_zone == 'external' && server_zone == 'internal' ? received_pkts : traffic_outbound_pkt"
+
+ - function: EVAL
+ output_fields: [ traffic_outbound_byte ]
+ parameters:
+ value_expression: "client_zone == 'internal' && server_zone == 'external' ? sent_bytes : traffic_outbound_byte"
+
+ - function: EVAL
+ output_fields: [ traffic_inbound_byte ]
+ parameters:
+ value_expression: "client_zone == 'external' && server_zone == 'internal' ? sent_bytes : traffic_inbound_byte"
+
+ - function: EVAL
+ output_fields: [ traffic_outbound_pkt ]
+ parameters:
+ value_expression: "client_zone == 'internal' && server_zone == 'external' ? sent_pkts : traffic_outbound_pkt"
+
+ - function: EVAL
+ output_fields: [ traffic_inbound_pkt ]
+ parameters:
+ value_expression: "client_zone == 'external' && server_zone == 'internal' ? sent_pkts : traffic_inbound_pkt"
+
+ - function: EVAL
+ output_fields: [ traffic_internal_byte ]
+ parameters:
+ value_expression: "client_zone == 'internal' && server_zone == 'internal' ? sent_bytes + received_bytes : traffic_internal_byte"
+
+ - function: EVAL
+ output_fields: [ traffic_internal_pkt ]
+ parameters:
+ value_expression: "client_zone == 'internal' && server_zone == 'internal' ? sent_pkts + received_pkts : traffic_internal_pkt"
+
+ - function: EVAL
+ output_fields: [ traffic_through_byte ]
+ parameters:
+ value_expression: "client_zone == 'external' && server_zone == 'external' ? sent_bytes + received_bytes : traffic_through_byte"
+
+ - function: EVAL
+ output_fields: [ traffic_through_pkt ]
+ parameters:
+ value_expression: "client_zone == 'external' && server_zone == 'external' ? sent_pkts + received_pkts : traffic_through_pkt"
+
+ - function: EVAL
+ output_fields: [ sessions ]
+ parameters:
+ value_expression: "1"
+
+ - function: EVAL
+ output_fields: [ internal_query_num ]
+ parameters:
+ value_expression: "client_zone == 'internal' ? sessions : internal_query_num"
+
+ - function: EVAL
+ output_fields: [ external_query_num ]
+ parameters:
+ value_expression: "client_zone == 'external' ? sessions : external_query_num"
- function: CN_ANONYMITY_LOOKUP
lookup_fields: [ server_ip ]
@@ -255,48 +315,24 @@ processing_pipelines:
output_fields: [ client_ip_tags ]
- function: GENERATE_STRING_ARRAY
- lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_vpn_service_name,server_ip_tags ]
+ lookup_fields: [ server_idc_renter,server_dns_server,server_node_type,server_malware,server_ip_tags ]
output_fields: [ server_ip_tags ]
- function: GENERATE_STRING_ARRAY
- lookup_fields: [ domain_node_type,domain_malware,domain_vpn_service_name,domain_tags ]
- output_fields: [ domain_tags ]
-
- - function: CN_ARRAY_ELEMENTS_PREPEND
- lookup_fields: [ client_ip_tags ]
- output_fields: [ client_ip_tags ]
- parameters:
- prefix: ip.
-
- - function: CN_ARRAY_ELEMENTS_PREPEND
- lookup_fields: [ server_ip_tags ]
- output_fields: [ server_ip_tags ]
- parameters:
- prefix: ip.
-
- - function: CN_ARRAY_ELEMENTS_PREPEND
- lookup_fields: [ domain_tags ]
+ lookup_fields: [ domain_node_type,domain_malware,domain_tags ]
output_fields: [ domain_tags ]
- parameters:
- prefix: domain.
-
- - function: CN_ARRAY_ELEMENTS_PREPEND
- lookup_fields: [ app_tags ]
- output_fields: [ app_tags ]
- parameters:
- prefix: app.
postprocessing_pipelines:
remove_field_processor: # [object] Processing Pipeline
type: com.geedgenetworks.core.processor.projection.ProjectionProcessorImpl
- output_fields: [ recv_time,log_id,flags,start_timestamp_ms,end_timestamp_ms,duration_ms,decoded_as,client_ip,server_ip,client_port,server_port,app,app_transition,decoded_path,ip_protocol,l7_protocol,out_link_id,in_link_id,subscriber_id,imei,imsi,phone_number,apn,http_url,dns_rcode,dns_qname,dns_qtype,dns_rr,out_link_direction,in_link_direction,server_fqdn,server_domain,domain,domain_sld,domain_category_name,domain_category_group,domain_reputation_level,domain_icp_company_name,domain_whois_org,domain_tags,client_zone,client_country_region,client_super_admin_area,client_admin_area,client_longitude,client_latitude,client_isp,client_asn,client_ip_tags,server_zone,server_country_region,server_super_admin_area,server_admin_area,server_longitude,server_latitude,server_isp,server_asn,server_ip_tags,app_category,app_subcategory,app_company,app_company_category,app_tags,sent_pkts,sent_bytes,received_pkts,received_bytes,sessions,tcp_c2s_lost_bytes,tcp_s2c_lost_bytes,tcp_c2s_o3_pkts,tcp_s2c_o3_pkts,tcp_c2s_rtx_bytes,tcp_s2c_rtx_bytes,tcp_c2s_rtx_pkts,tcp_s2c_rtx_pkts,tcp_rtt_ms,http_response_latency_ms,ssl_handshake_latency_ms,dns_response_latency_ms,cn_internal_rule_id_list,cn_internal_ioc_type_list ]
+ output_fields: [ recv_time,log_id,flags,start_timestamp_ms,end_timestamp_ms,duration_ms,decoded_as,client_ip,server_ip,client_port,server_port,app,app_transition,decoded_path,ip_protocol,l7_protocol,out_link_id,in_link_id,subscriber_id,imei,imsi,phone_number,apn,http_url,dns_rcode,dns_qname,dns_qtype,dns_rr,out_link_direction,in_link_direction,server_fqdn,server_domain,domain,domain_sld,domain_category_name,domain_category_group,domain_reputation_level,domain_icp_company_name,domain_whois_org,domain_tags,client_zone,client_country_region,client_super_admin_area,client_admin_area,client_longitude,client_latitude,client_isp,client_asn,client_ip_tags,server_zone,server_country_region,server_super_admin_area,server_admin_area,server_longitude,server_latitude,server_isp,server_asn,server_ip_tags,app_category,app_subcategory,app_company,app_company_category,app_tags,sent_pkts,sent_bytes,received_pkts,received_bytes,sessions,tcp_c2s_lost_bytes,tcp_s2c_lost_bytes,tcp_c2s_o3_pkts,tcp_s2c_o3_pkts,tcp_c2s_rtx_bytes,tcp_s2c_rtx_bytes,tcp_c2s_rtx_pkts,tcp_s2c_rtx_pkts,tcp_rtt_ms,http_response_latency_ms,ssl_handshake_latency_ms,dns_response_latency_ms,cn_internal_rule_id_list,cn_internal_ioc_type_list,traffic_inbound_byte,traffic_inbound_pkt,traffic_outbound_byte,traffic_outbound_pkt,traffic_internal_byte,traffic_internal_pkt,traffic_through_byte,traffic_through_pkt,internal_query_num,external_query_num ]
sinks:
- kafka_sink_a:
+ cn_kafka_sink:
type: kafka
properties:
topic: SESSION-RECORD-CN
- kafka.bootstrap.servers: 192.168.44.55:9092
+ kafka.bootstrap.servers: { { kafka_sink_servers } }
kafka.retries: 0
kafka.linger.ms: 10
kafka.request.timeout.ms: 30000
@@ -304,29 +340,23 @@ sinks:
kafka.buffer.memory: 134217728
kafka.max.request.size: 10485760
kafka.compression.type: snappy
- kafka.security.protocol:
- kafka.ssl.keystore.location:
- kafka.ssl.keystore.password:
- kafka.ssl.truststore.location:
- kafka.ssl.truststore.password:
- kafka.ssl.key.password:
- kafka.sasl.mechanism:
- kafka.sasl.jaas.config:
+ kafka.security.protocol: SASL_PLAINTEXT
+ kafka.sasl.mechanism: PLAIN
+ kafka.sasl.jaas.config: 454f65ea6eef1256e3067104f82730e737b68959560966b811e7ff364116b03124917eb2b0f3596f14733aa29ebad9352644ce1a5c85991c6f01ba8a5e8f177a7ff0b2d3889a424249967b3870b50993d9644f239f0de82cdb13bdb502959e16afadffa49ef1e1d2b9c9b5113e619817
format: json
-application: # [object] Application Configuration
- env: # [object] Environment Variables
- name: groot-stream-job # [string] Job Name
- parallelism: 3 # [number] Job-Level Parallelism
+application:
+ env:
+ name: etl_session_record_processed_kafka_to_cn_kafka
+ shade.identifier: aes
pipeline:
- object-reuse: true # [boolean] Object Reuse, default is false
- topology: # [array of object] Node List. It will be used build data flow for job dag graph.
- - name: kafka_source # [string] Node Name, must be unique. It will be used as the name of the corresponding Flink operator. eg. kafka_source the processor type as SOURCE.
- #parallelism: 1 # [number] Operator-Level Parallelism.
- downstream: [ session_record_processor ] # [array of string] Downstream Node Name List.
+ object-reuse: true
+ topology:
+ - name: kafka_source
+ downstream: [ session_record_processor ]
- name: session_record_processor
downstream: [ remove_field_processor ]
- name: remove_field_processor
- downstream: [ kafka_sink_a ]
- - name: kafka_sink_a
+ downstream: [ cn_kafka_sink ]
+ - name: cn_kafka_sink
downstream: [ ]