summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2022-08-15 11:29:30 +0800
committerqidaijie <[email protected]>2022-08-15 11:29:30 +0800
commit0a13d028f4148fcdc9c377a972000501ae4d0704 (patch)
tree46e38b85724a96e14835198537d8df67ad57019d
parenta21df7afe13ecc64eb321733905c904dc3e7202e (diff)
删除Fastjson,改为使用hutool工具类解析json。
-rw-r--r--pom.xml16
-rw-r--r--properties/default_config.properties6
-rw-r--r--properties/service_flow_config.properties20
-rw-r--r--src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java22
-rw-r--r--src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java6
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java66
-rw-r--r--src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java12
-rw-r--r--src/test/java/com/zdjizhi/FunctionTest.java27
8 files changed, 59 insertions, 116 deletions
diff --git a/pom.xml b/pom.xml
index 14c63de..91bddb3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>radius-account-knowledge</artifactId>
- <version>220413-sink</version>
+ <version>220815-JSON</version>
<name>radius-account-knowledge</name>
<url>http://www.example.com</url>
@@ -38,6 +38,8 @@
<hadoop.version>2.7.1</hadoop.version>
<kafka.version>1.0.0</kafka.version>
<hbase.version>2.2.3</hbase.version>
+ <hutool.version>5.7.17</hutool.version>
+ <jsonpath.version>2.4.0</jsonpath.version>
<!--<scope.type>provided</scope.type>-->
<scope.type>compile</scope.type>
</properties>
@@ -129,13 +131,6 @@
</exclusions>
</dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.70</version>
- </dependency>
-
-
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -193,14 +188,13 @@
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
- <version>2.4.0</version>
+ <version>${jsonpath.version}</version>
</dependency>
-
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
- <version>5.5.2</version>
+ <version>${hutool.version}</version>
</dependency>
<dependency>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index 787d12a..a683f33 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -29,6 +29,12 @@ buffer.memory=134217728
#10M
max.request.size=10485760
+#������ѹ��ģʽ none or snappy
+producer.kafka.compression.type=none
+
+#������ack
+producer.ack=1
+
#====================kafka default====================#
#kafka SASL��֤�û���-����
kafka.user=nsyGpHKGFA4KW0zro9MDdw==
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 4beb64a..05f7466 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,27 +1,19 @@
-#--------------------------------地址配置------------------------------#
-
+#--------------------------------kafka消费组信息------------------------------#
#管理kafka地址
source.kafka.servers=192.168.44.12:9094
-#管理输出kafka地址
-sink.kafka.servers=192.168.44.12:9094
-
-#--------------------------------Kafka消费组信息------------------------------#
-
#kafka 接收数据topic
source.kafka.topic=test
-#补全数据 输出 topic
-sink.kafka.topic=RADIUS-ONFF
-
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
group.id=radius-on-off-flink-20210615
-#生产者压缩模式 none or snappy
-producer.kafka.compression.type=none
+#--------------------------------Kafka生产者信息------------------------------#
+#管理输出kafka地址
+sink.kafka.servers=192.168.44.12:9094
-#生产者ack
-producer.ack=1
+#补全数据 输出 topic
+sink.kafka.topic=RADIUS-ONFF
#--------------------------------topology配置------------------------------#
#定位库地址
diff --git a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java
index cb9ead3..04ff500 100644
--- a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java
+++ b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java
@@ -32,28 +32,12 @@ public class RadiusKnowledgeConfig {
* 2、停止计费
*/
public static final int STOP_BILLING = 2;
+
/**
* 报文类型
*/
public static final String RADIUS_PACKET_TYPE = "radius_packet_type";
- /**
- * 计费请求报文类型
- */
- public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type";
- /**
- * 发送计费请求报文时间戳
- */
- public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp";
- /**
- * 一个用户多个计费ID关联属性
- */
- public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id";
-
- /**
- * 用户的在线时长,以秒为单位
- */
- public static final String RADIUS_SESSION_TIME = "radius_acct_session_time";
/**
@@ -64,8 +48,8 @@ public class RadiusKnowledgeConfig {
public static final String SINK_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.servers");
public static final String SINK_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String GROUP_ID = RadiusKnowledgeConfigurations.getStringProperty(0, "group.id");
- public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.ack");
- public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+ public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(1, "producer.ack");
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(1, "producer.kafka.compression.type");
/**
* default config
diff --git a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java
index 3027bfa..afd714a 100644
--- a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java
+++ b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java
@@ -2,6 +2,7 @@ package com.zdjizhi.topology;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.RadiusKnowledgeConfig;
import com.zdjizhi.utils.functions.FilterNullFunction;
import com.zdjizhi.utils.functions.MapCompletedFunction;
import com.zdjizhi.utils.kafka.KafkaConsumer;
@@ -24,10 +25,10 @@ public class RadiusKnowledgeTopology {
DataStreamSource<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer());
- DataStream<String> result = streamSource.map(new MapCompletedFunction()).name("RadiusOnOffMap")
+ DataStream<String> result = streamSource.map(new MapCompletedFunction()).name(RadiusKnowledgeConfig.SOURCE_KAFKA_TOPIC)
.filter(new FilterNullFunction()).name("FilterAbnormalData");
- result.addSink(KafkaProducer.getKafkaProducer()).name("LogSink");
+ result.addSink(KafkaProducer.getKafkaProducer()).name(RadiusKnowledgeConfig.SINK_KAFKA_TOPIC);
try {
environment.execute("RADIUS-ON-OFF");
@@ -37,5 +38,4 @@ public class RadiusKnowledgeTopology {
}
-
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
index a875d0f..7131c08 100644
--- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
@@ -1,8 +1,8 @@
package com.zdjizhi.utils.functions;
+import cn.hutool.json.JSONObject;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.common.RadiusKnowledgeConfig;
import com.zdjizhi.utils.StringUtil;
import org.apache.flink.api.common.functions.MapFunction;
@@ -21,17 +21,17 @@ public class MapCompletedFunction implements MapFunction<String, String> {
String result = null;
try {
if (StringUtil.isNotBlank(logs)) {
- JSONObject jsonMap = JSONObject.parseObject(logs);
- if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) {
- int packetType = jsonMap.getInteger(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE);
+ JSONObject json = new JSONObject(logs, false, true);
+ if (json.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) {
+ int packetType = json.getInt(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE);
if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) {
- result = GetKnowledgeLogs(jsonMap);
+ result = GetKnowledgeLogs(json);
}
}
}
} catch (RuntimeException e) {
- logger.error("数据解析异常,异常信息:" + e);
+ logger.error("Radius log parsing exception,Message is :" + e);
}
return result;
}
@@ -44,42 +44,36 @@ public class MapCompletedFunction implements MapFunction<String, String> {
*/
private static String GetKnowledgeLogs(JSONObject jsonMap) {
JSONObject knowledge = new JSONObject();
+ String framedIp = jsonMap.getStr("radius_framed_ip");
+ String account = jsonMap.getStr("radius_account");
- knowledge.put("framed_ip", jsonMap.getString("radius_framed_ip"));
+ if (StringUtil.isNotBlank(framedIp) && StringUtil.isNotBlank(account)) {
+ knowledge.set("framed_ip", framedIp);
- knowledge.put("account", jsonMap.getString("radius_account"));
+ knowledge.set("account", account);
- knowledge.put("acct_status_type", jsonMap.getString("radius_acct_status_type"));
+ //计费请求报文类型
+ knowledge.set("acct_status_type", jsonMap.getStr("radius_acct_status_type"));
- /*
- 如果存在时间戳则选择此时间戳没有获取当前时间
- */
- if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) {
- knowledge.put("event_timestamp", jsonMap.getString("radius_event_timestamp"));
- } else {
- knowledge.put("event_timestamp", (System.currentTimeMillis() / 1000));
- }
+ //如果存在时间戳则选择此时间戳没有获取当前时间
+ knowledge.set("event_timestamp", jsonMap.getLong("radius_event_timestamp", System.currentTimeMillis() / 1000));
- /*
- * 标识同一个连接:
- * 1.数据若存在acct_multi_session_id属性,取该属性
- * 2.不存在取 acct_session_id
- */
- if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_MULTI_SESSION_ID)) {
- knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id"));
- } else {
- knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id"));
- }
+ /*
+ * 标识同一个连接:
+ * 1.数据若存在acct_multi_session_id属性,取该属性
+ * 2.不存在取 acct_session_id
+ */
+ if (jsonMap.containsKey("radius_acct_multi_session_id")) {
+ knowledge.set("acct_session_id", jsonMap.getStr("radius_acct_multi_session_id"));
+ } else {
+ knowledge.set("acct_session_id", jsonMap.get("radius_acct_session_id"));
+ }
- /*
- 用户的在线时长,以秒为单位,下线用户无此属性,默认为0
- */
- if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_SESSION_TIME)) {
- knowledge.put("acct_session_time", jsonMap.getString("radius_acct_session_time"));
- } else {
- knowledge.put("acct_session_time", 0);
- }
+ //用户的在线时长,以秒为单位,下线用户无此属性,默认为0
+ knowledge.set("acct_session_time", jsonMap.getLong("radius_acct_session_time", 0L));
- return knowledge.toJSONString();
+ return knowledge.toString();
+ }
+ return "";
}
}
diff --git a/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java
index 7b7c046..aa5f234 100644
--- a/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java
+++ b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java
@@ -19,9 +19,9 @@ public final class RadiusKnowledgeConfigurations {
public static String getStringProperty(Integer type, String key) {
if (type == 0) {
- return propService.getProperty(key);
+ return propService.getProperty(key).trim();
} else if (type == 1) {
- return propDefault.getProperty(key);
+ return propDefault.getProperty(key).trim();
} else {
return null;
}
@@ -30,9 +30,9 @@ public final class RadiusKnowledgeConfigurations {
public static Integer getIntProperty(Integer type, String key) {
if (type == 0) {
- return Integer.parseInt(propService.getProperty(key));
+ return Integer.parseInt(propService.getProperty(key).trim());
} else if (type == 1) {
- return Integer.parseInt(propDefault.getProperty(key));
+ return Integer.parseInt(propDefault.getProperty(key).trim());
} else {
return null;
}
@@ -40,9 +40,9 @@ public final class RadiusKnowledgeConfigurations {
public static Long getLongProperty(Integer type, String key) {
if (type == 0) {
- return Long.parseLong(propService.getProperty(key));
+ return Long.parseLong(propService.getProperty(key).trim());
} else if (type == 1) {
- return Long.parseLong(propDefault.getProperty(key));
+ return Long.parseLong(propDefault.getProperty(key).trim());
} else {
return null;
}
diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java
deleted file mode 100644
index f1bba26..0000000
--- a/src/test/java/com/zdjizhi/FunctionTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package com.zdjizhi;
-
-import com.alibaba.fastjson.JSONObject;
-import com.jayway.jsonpath.Configuration;
-import com.jayway.jsonpath.JsonPath;
-import org.junit.Test;
-
-import java.util.Map;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi
- * @Description:
- * @date 2021/6/3011:46
- */
-public class FunctionTest {
-
- @Test
- public void jsonTest(){
- String logs = "{\"common_schema_type\":\"HTTP\",\"common_sessions\":1,\"http_request_line\":\"POST /portmap HTTP/1.1\",\"http_request_content_type\":\"application/json\",\"http_request_content_length\":21,\"http_user_agent\":\"Jakarta Commons-HttpClient/3.1\",\"http_host\":\"portmap.pro.testin.cn:8888\",\"http_url\":\"portmap.pro.testin.cn:8888/portmap\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_response_content_length\":286,\"http_isn\":2504229113,\"http_proxy_flag\":0,\"http_version\":\"http1\",\"http_response_latency_ms\":2,\"http_session_duration_ms\":3,\"http_sequence\":1,\"common_protocol_label\":\"ETHERNET.VLAN.IPv4.TCP\",\"common_c2s_byte_diff\":555,\"common_c2s_pkt_diff\":5,\"common_s2c_byte_diff\":684,\"common_s2c_pkt_diff\":4,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_first_ttl\":64,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":0,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":0,\"common_s2c_byte_retrans\":0,\"common_direction\":69,\"common_l7_protocol\":\"HTTP\",\"common_tcp_client_isn\":2504229113,\"common_tcp_server_isn\":468823374,\"common_server_ip\":\"192.168.42.1\",\"common_client_ip\":\"192.168.56.14\",\"common_server_port\":8888,\"common_client_port\":51118,\"common_stream_dir\":3,\"common_address_type\":4,\"common_address_list\":\"51118-8888-192.168.56.14-192.168.42.1\",\"common_start_time\":1652247928,\"common_end_time\":1652247928,\"common_con_duration_ms\":4,\"common_s2c_pkt_num\":4,\"common_s2c_byte_num\":684,\"common_c2s_pkt_num\":5,\"common_c2s_byte_num\":555,\"common_establish_latency_ms\":2,\"http_action_file_size\":0,\"common_tunnels\":[{\"tunnels_schema_type\":\"VLAN\",\"vlan_c2s_direction_id\":[56],\"vlan_s2c_direction_id\":[56]},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0e:c6:6f:dd:12\",\"c2s_destination_mac\":\"e8:1c:ba:cc:87:17\",\"s2c_source_mac\":\"e8:1c:ba:cc:87:17\",\"s2c_destination_mac\":\"00:0e:c6:6f:dd:12\"}],\"common_stream_trace_id\":\"869433624695175123\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.81\",\"common_device_id\":\"21426003\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-three\\\"}]}\",\"common_policy_id\":0,\"common_service\":2,\"common_action\":0,\"common_link_info_c2s\":{\"functional_ip\":\"192.168.40.81\",\"thread_count\":43,\"relative_location\":\"internal\",\"hash_dist\":2,\"hash_algo\":0,\"linkinfo\":[{\"type\":\"ethernet\",\"src\":\"90:00:00:91:40:45\",\"dst\":\"0a:0a:0a:0a:00:0c\"},{\"type\":\"tuple4v4\",\"sip\":\"10.252.21.1\",\"dip\":\"10.10.0.12\",\"sport\":54789,\"dport\":4789},{\"type\":\"vxlan\",\"vlanid\":0,\"dir\":0,\"linkid\":22,\"linktype\":0},{\"type\":\"ethernet\",\"src\":\"00:0e:c6:6f:dd:12\",\"dst\":\"e8:1c:ba:cc:87:17\"},{\"type\":\"vlan\",\"vlan_id_array\":[56]}]},\"common_link_info_s2c\":{\"functional_ip\":\"192.168.40.81\",\"thread_count\":43,\"relative_location\":\"external\",\"hash_dist\":2,\"hash_algo\":0,\"linkinfo\":[{\"type\":\"ethernet\",\"src\":\"90:00:00:91:40:45\",\"dst\":\"0a:0a:0a:0a:00:0c\"},{\"type\":\"tuple4v4\",\"sip\":\"10.252.21.1\",\"dip\":\"10.10.0.12\",\"sport\":54789,\"dport\":4789},{\"type\":\"vxlan\",\"vlanid\":0,\"dir\":1,\"linkid\":22,\"linktype\":0},{\"type\":\"ethernet\",\"src\":\"e8:1c:ba:cc:87:17\",\"dst\":\"00:0e:c6:6f:dd:12\"},{\"type\":\"vlan\",\"vlan_id_array\":[56]}]}}\n";
- Object document = Configuration.defaultConfiguration().jsonProvider().parse(logs);
- System.out.println(JsonPath.read(document, "$.name").toString());
- JSONObject jsonObject = JSONObject.parseObject(logs);
- System.out.println(jsonObject.getString("common_schema_type"));
-
- }
-}