summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pom.xml4
-rw-r--r--properties/default_config.properties2
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java2
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java8
-rw-r--r--src/test/java/com/zdjizhi/FunctionTest.java7
5 files changed, 16 insertions, 7 deletions
diff --git a/pom.xml b/pom.xml
index 01b279d..14c63de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>radius-account-knowledge</artifactId>
- <version>220316-encryption</version>
+ <version>220413-sink</version>
<name>radius-account-knowledge</name>
<url>http://www.example.com</url>
@@ -116,7 +116,7 @@
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
- <version>1.0.7</version>
+ <version>1.0.8</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index f25c22e..787d12a 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -7,6 +7,7 @@ max.poll.records=3000
#kafka source poll bytes
max.partition.fetch.bytes=31457280
+
#====================Kafka KafkaProducer====================#
#producer���ԵĴ�������
retries=0
@@ -27,6 +28,7 @@ buffer.memory=134217728
#�������������ÿ�η��͸�Kafka���������������С,Ĭ��1048576
#10M
max.request.size=10485760
+
#====================kafka default====================#
#kafka SASL��֤�û���-����
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
index 3c0a1da..a875d0f 100644
--- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
@@ -59,6 +59,7 @@ public class MapCompletedFunction implements MapFunction<String, String> {
} else {
knowledge.put("event_timestamp", (System.currentTimeMillis() / 1000));
}
+
/*
* 标识同一个连接:
* 1.数据若存在acct_multi_session_id属性,取该属性
@@ -69,6 +70,7 @@ public class MapCompletedFunction implements MapFunction<String, String> {
} else {
knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id"));
}
+
/*
用户的在线时长,以秒为单位,下线用户无此属性,默认为0
*/
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index c259fd7..3b88909 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -38,10 +38,12 @@ public class KafkaProducer {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
RadiusKnowledgeConfig.SINK_KAFKA_TOPIC,
new SimpleStringSchema(),
- createProducerConfig(), Optional.empty());
+ createProducerConfig(),
+ //sink与所有分区建立连接,轮询写入;
+ Optional.empty());
- kafkaProducer.setLogFailuresOnly(false);
-// kafkaProducer.setWriteTimestampToKafka(true);
+ //允许producer记录失败日志而不是捕获和抛出它们
+ kafkaProducer.setLogFailuresOnly(true);
return kafkaProducer;
}
diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java
index 6b52828..f1bba26 100644
--- a/src/test/java/com/zdjizhi/FunctionTest.java
+++ b/src/test/java/com/zdjizhi/FunctionTest.java
@@ -5,6 +5,8 @@ import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import org.junit.Test;
+import java.util.Map;
+
/**
* @author qidaijie
* @Package com.zdjizhi
@@ -15,10 +17,11 @@ public class FunctionTest {
@Test
public void jsonTest(){
- String logs = "{\"name\":\"test\"}";
+ String logs = "{\"common_schema_type\":\"HTTP\",\"common_sessions\":1,\"http_request_line\":\"POST /portmap HTTP/1.1\",\"http_request_content_type\":\"application/json\",\"http_request_content_length\":21,\"http_user_agent\":\"Jakarta Commons-HttpClient/3.1\",\"http_host\":\"portmap.pro.testin.cn:8888\",\"http_url\":\"portmap.pro.testin.cn:8888/portmap\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_response_content_length\":286,\"http_isn\":2504229113,\"http_proxy_flag\":0,\"http_version\":\"http1\",\"http_response_latency_ms\":2,\"http_session_duration_ms\":3,\"http_sequence\":1,\"common_protocol_label\":\"ETHERNET.VLAN.IPv4.TCP\",\"common_c2s_byte_diff\":555,\"common_c2s_pkt_diff\":5,\"common_s2c_byte_diff\":684,\"common_s2c_pkt_diff\":4,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_first_ttl\":64,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":0,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":0,\"common_s2c_byte_retrans\":0,\"common_direction\":69,\"common_l7_protocol\":\"HTTP\",\"common_tcp_client_isn\":2504229113,\"common_tcp_server_isn\":468823374,\"common_server_ip\":\"192.168.42.1\",\"common_client_ip\":\"192.168.56.14\",\"common_server_port\":8888,\"common_client_port\":51118,\"common_stream_dir\":3,\"common_address_type\":4,\"common_address_list\":\"51118-8888-192.168.56.14-192.168.42.1\",\"common_start_time\":1652247928,\"common_end_time\":1652247928,\"common_con_duration_ms\":4,\"common_s2c_pkt_num\":4,\"common_s2c_byte_num\":684,\"common_c2s_pkt_num\":5,\"common_c2s_byte_num\":555,\"common_establish_latency_ms\":2,\"http_action_file_size\":0,\"common_tunnels\":[{\"tunnels_schema_type\":\"VLAN\",\"vlan_c2s_direction_id\":[56],\"vlan_s2c_direction_id\":[56]},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0e:c6:6f:dd:12\",\"c2s_destination_mac\":\"e8:1c:ba:cc:87:17\",\"s2c_source_mac\":\"e8:1c:ba:cc:87:17\",\"s2c_destination_mac\":\"00:0e:c6:6f:dd:12\"}],\"common_stream_trace_id\":\"869433624695175123\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.81\",\"common_device_id\":\"21426003\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-three\\\"}]}\",\"common_policy_id\":0,\"common_service\":2,\"common_action\":0,\"common_link_info_c2s\":{\"functional_ip\":\"192.168.40.81\",\"thread_count\":43,\"relative_location\":\"internal\",\"hash_dist\":2,\"hash_algo\":0,\"linkinfo\":[{\"type\":\"ethernet\",\"src\":\"90:00:00:91:40:45\",\"dst\":\"0a:0a:0a:0a:00:0c\"},{\"type\":\"tuple4v4\",\"sip\":\"10.252.21.1\",\"dip\":\"10.10.0.12\",\"sport\":54789,\"dport\":4789},{\"type\":\"vxlan\",\"vlanid\":0,\"dir\":0,\"linkid\":22,\"linktype\":0},{\"type\":\"ethernet\",\"src\":\"00:0e:c6:6f:dd:12\",\"dst\":\"e8:1c:ba:cc:87:17\"},{\"type\":\"vlan\",\"vlan_id_array\":[56]}]},\"common_link_info_s2c\":{\"functional_ip\":\"192.168.40.81\",\"thread_count\":43,\"relative_location\":\"external\",\"hash_dist\":2,\"hash_algo\":0,\"linkinfo\":[{\"type\":\"ethernet\",\"src\":\"90:00:00:91:40:45\",\"dst\":\"0a:0a:0a:0a:00:0c\"},{\"type\":\"tuple4v4\",\"sip\":\"10.252.21.1\",\"dip\":\"10.10.0.12\",\"sport\":54789,\"dport\":4789},{\"type\":\"vxlan\",\"vlanid\":0,\"dir\":1,\"linkid\":22,\"linktype\":0},{\"type\":\"ethernet\",\"src\":\"e8:1c:ba:cc:87:17\",\"dst\":\"00:0e:c6:6f:dd:12\"},{\"type\":\"vlan\",\"vlan_id_array\":[56]}]}}\n";
Object document = Configuration.defaultConfiguration().jsonProvider().parse(logs);
System.out.println(JsonPath.read(document, "$.name").toString());
JSONObject jsonObject = JSONObject.parseObject(logs);
- System.out.println(jsonObject.getLong("age"));
+ System.out.println(jsonObject.getString("common_schema_type"));
+
}
}