diff options
| author | qidaijie <[email protected]> | 2022-08-15 10:51:10 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2022-08-15 10:51:10 +0800 |
| commit | a21df7afe13ecc64eb321733905c904dc3e7202e (patch) | |
| tree | 1ff3b5091f509d1e219079f646a3e2c467b18524 | |
| parent | a8195cd3e23deb779258e70f95f203259b020d31 (diff) | |
修改kafka producer参数配置
| -rw-r--r-- | pom.xml | 4 | ||||
| -rw-r--r-- | properties/default_config.properties | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java | 8 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/FunctionTest.java | 7 |
5 files changed, 16 insertions, 7 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>radius-account-knowledge</artifactId> - <version>220316-encryption</version> + <version>220413-sink</version> <name>radius-account-knowledge</name> <url>http://www.example.com</url> @@ -116,7 +116,7 @@ <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> - <version>1.0.7</version> + <version>1.0.8</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> diff --git a/properties/default_config.properties b/properties/default_config.properties index f25c22e..787d12a 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -7,6 +7,7 @@ max.poll.records=3000 #kafka source poll bytes max.partition.fetch.bytes=31457280 + #====================Kafka KafkaProducer====================# #producer���ԵĴ������� retries=0 @@ -27,6 +28,7 @@ buffer.memory=134217728 #�������������ÿ�η���Kafka���������������С,Ĭ��1048576 #10M max.request.size=10485760 + #====================kafka default====================# #kafka SASL��֤�û���-���� kafka.user=nsyGpHKGFA4KW0zro9MDdw== diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java index 3c0a1da..a875d0f 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java @@ -59,6 +59,7 @@ public class MapCompletedFunction implements MapFunction<String, String> { } else { knowledge.put("event_timestamp", (System.currentTimeMillis() / 1000)); } + /* * 标识同一个连接: * 1.数据若存在acct_multi_session_id属性,取该属性 @@ -69,6 +70,7 @@ public class MapCompletedFunction implements MapFunction<String, String> { } else { knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id")); } + /* 用户的在线时长,以秒为单位,下线用户无此属性,默认为0 */ diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index c259fd7..3b88909 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -38,10 +38,12 @@ public class KafkaProducer { FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( RadiusKnowledgeConfig.SINK_KAFKA_TOPIC, new SimpleStringSchema(), - createProducerConfig(), Optional.empty()); + createProducerConfig(), + //sink与所有分区建立连接,轮询写入; + Optional.empty()); - kafkaProducer.setLogFailuresOnly(false); -// kafkaProducer.setWriteTimestampToKafka(true); + //允许producer记录失败日志而不是捕获和抛出它们 + kafkaProducer.setLogFailuresOnly(true); return kafkaProducer; } diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java index 6b52828..f1bba26 100644 --- a/src/test/java/com/zdjizhi/FunctionTest.java +++ b/src/test/java/com/zdjizhi/FunctionTest.java @@ -5,6 +5,8 @@ import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import org.junit.Test; +import java.util.Map; + /** * @author qidaijie * @Package com.zdjizhi @@ -15,10 +17,11 @@ public class FunctionTest { @Test public void jsonTest(){ - String logs = "{\"name\":\"test\"}"; + String logs = "{\"common_schema_type\":\"HTTP\",\"common_sessions\":1,\"http_request_line\":\"POST /portmap HTTP/1.1\",\"http_request_content_type\":\"application/json\",\"http_request_content_length\":21,\"http_user_agent\":\"Jakarta Commons-HttpClient/3.1\",\"http_host\":\"portmap.pro.testin.cn:8888\",\"http_url\":\"portmap.pro.testin.cn:8888/portmap\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_response_content_length\":286,\"http_isn\":2504229113,\"http_proxy_flag\":0,\"http_version\":\"http1\",\"http_response_latency_ms\":2,\"http_session_duration_ms\":3,\"http_sequence\":1,\"common_protocol_label\":\"ETHERNET.VLAN.IPv4.TCP\",\"common_c2s_byte_diff\":555,\"common_c2s_pkt_diff\":5,\"common_s2c_byte_diff\":684,\"common_s2c_pkt_diff\":4,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_first_ttl\":64,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":0,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":0,\"common_s2c_byte_retrans\":0,\"common_direction\":69,\"common_l7_protocol\":\"HTTP\",\"common_tcp_client_isn\":2504229113,\"common_tcp_server_isn\":468823374,\"common_server_ip\":\"192.168.42.1\",\"common_client_ip\":\"192.168.56.14\",\"common_server_port\":8888,\"common_client_port\":51118,\"common_stream_dir\":3,\"common_address_type\":4,\"common_address_list\":\"51118-8888-192.168.56.14-192.168.42.1\",\"common_start_time\":1652247928,\"common_end_time\":1652247928,\"common_con_duration_ms\":4,\"common_s2c_pkt_num\":4,\"common_s2c_byte_num\":684,\"common_c2s_pkt_num\":5,\"common_c2s_byte_num\":555,\"common_establish_latency_ms\":2,\"http_action_file_size\":0,\"common_tunnels\":[{\"tunnels_schema_type\":\"VLAN\",\"vlan_c2s_direction_id\":[56],\"vlan_s2c_direction_id\":[56]},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0e:c6:6f:dd:12\",\"c2s_destination_mac\":\"e8:1c:ba:cc:87:17\",\"s2c_source_mac\":\"e8:1c:ba:cc:87:17\",\"s2c_destination_mac\":\"00:0e:c6:6f:dd:12\"}],\"common_stream_trace_id\":\"869433624695175123\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.81\",\"common_device_id\":\"21426003\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-three\\\"}]}\",\"common_policy_id\":0,\"common_service\":2,\"common_action\":0,\"common_link_info_c2s\":{\"functional_ip\":\"192.168.40.81\",\"thread_count\":43,\"relative_location\":\"internal\",\"hash_dist\":2,\"hash_algo\":0,\"linkinfo\":[{\"type\":\"ethernet\",\"src\":\"90:00:00:91:40:45\",\"dst\":\"0a:0a:0a:0a:00:0c\"},{\"type\":\"tuple4v4\",\"sip\":\"10.252.21.1\",\"dip\":\"10.10.0.12\",\"sport\":54789,\"dport\":4789},{\"type\":\"vxlan\",\"vlanid\":0,\"dir\":0,\"linkid\":22,\"linktype\":0},{\"type\":\"ethernet\",\"src\":\"00:0e:c6:6f:dd:12\",\"dst\":\"e8:1c:ba:cc:87:17\"},{\"type\":\"vlan\",\"vlan_id_array\":[56]}]},\"common_link_info_s2c\":{\"functional_ip\":\"192.168.40.81\",\"thread_count\":43,\"relative_location\":\"external\",\"hash_dist\":2,\"hash_algo\":0,\"linkinfo\":[{\"type\":\"ethernet\",\"src\":\"90:00:00:91:40:45\",\"dst\":\"0a:0a:0a:0a:00:0c\"},{\"type\":\"tuple4v4\",\"sip\":\"10.252.21.1\",\"dip\":\"10.10.0.12\",\"sport\":54789,\"dport\":4789},{\"type\":\"vxlan\",\"vlanid\":0,\"dir\":1,\"linkid\":22,\"linktype\":0},{\"type\":\"ethernet\",\"src\":\"e8:1c:ba:cc:87:17\",\"dst\":\"00:0e:c6:6f:dd:12\"},{\"type\":\"vlan\",\"vlan_id_array\":[56]}]}}\n"; Object document = Configuration.defaultConfiguration().jsonProvider().parse(logs); System.out.println(JsonPath.read(document, "$.name").toString()); JSONObject jsonObject = JSONObject.parseObject(logs); - System.out.println(jsonObject.getLong("age")); + System.out.println(jsonObject.getString("common_schema_type")); + } } |
