diff options
| author | qidaijie <[email protected]> | 2022-08-15 11:29:30 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2022-08-15 11:29:30 +0800 |
| commit | 0a13d028f4148fcdc9c377a972000501ae4d0704 (patch) | |
| tree | 46e38b85724a96e14835198537d8df67ad57019d | |
| parent | a21df7afe13ecc64eb321733905c904dc3e7202e (diff) | |
删除Fastjson,改为使用hutool工具类解析json。
| -rw-r--r-- | pom.xml | 16 | ||||
| -rw-r--r-- | properties/default_config.properties | 6 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 20 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java | 22 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java | 6 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java | 66 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java | 12 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/FunctionTest.java | 27 |
8 files changed, 59 insertions, 116 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>radius-account-knowledge</artifactId> - <version>220413-sink</version> + <version>220815-JSON</version> <name>radius-account-knowledge</name> <url>http://www.example.com</url> @@ -38,6 +38,8 @@ <hadoop.version>2.7.1</hadoop.version> <kafka.version>1.0.0</kafka.version> <hbase.version>2.2.3</hbase.version> + <hutool.version>5.7.17</hutool.version> + <jsonpath.version>2.4.0</jsonpath.version> <!--<scope.type>provided</scope.type>--> <scope.type>compile</scope.type> </properties> @@ -129,13 +131,6 @@ </exclusions> </dependency> - <dependency> - <groupId>com.alibaba</groupId> - <artifactId>fastjson</artifactId> - <version>1.2.70</version> - </dependency> - - <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> <dependency> <groupId>org.apache.flink</groupId> @@ -193,14 +188,13 @@ <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> - <version>2.4.0</version> + <version>${jsonpath.version}</version> </dependency> - <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> - <version>5.5.2</version> + <version>${hutool.version}</version> </dependency> <dependency> diff --git a/properties/default_config.properties b/properties/default_config.properties index 787d12a..a683f33 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -29,6 +29,12 @@ buffer.memory=134217728 #10M max.request.size=10485760 +#������ѹ��ģʽ none or snappy +producer.kafka.compression.type=none + +#������ack +producer.ack=1 + #====================kafka default====================# #kafka SASL��֤�û���-���� kafka.user=nsyGpHKGFA4KW0zro9MDdw== diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 4beb64a..05f7466 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,27 +1,19 @@ -#--------------------------------地址配置------------------------------# - +#--------------------------------kafka消费组信息------------------------------# #管理kafka地址 source.kafka.servers=192.168.44.12:9094 -#管理输出kafka地址 -sink.kafka.servers=192.168.44.12:9094 - -#--------------------------------Kafka消费组信息------------------------------# - #kafka 接收数据topic source.kafka.topic=test -#补全数据 输出 topic -sink.kafka.topic=RADIUS-ONFF - #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; group.id=radius-on-off-flink-20210615 -#生产者压缩模式 none or snappy -producer.kafka.compression.type=none +#--------------------------------Kafka生产者信息------------------------------# +#管理输出kafka地址 +sink.kafka.servers=192.168.44.12:9094 -#生产者ack -producer.ack=1 +#补全数据 输出 topic +sink.kafka.topic=RADIUS-ONFF #--------------------------------topology配置------------------------------# #定位库地址 diff --git a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java index cb9ead3..04ff500 100644 --- a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java +++ b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java @@ -32,28 +32,12 @@ public class RadiusKnowledgeConfig { * 2、停止计费 */ public static final int STOP_BILLING = 2; + /** * 报文类型 */ public static final String RADIUS_PACKET_TYPE = "radius_packet_type"; - /** - * 计费请求报文类型 - */ - public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type"; - /** - * 发送计费请求报文时间戳 - */ - public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp"; - /** - * 一个用户多个计费ID关联属性 - */ - public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id"; - - /** - * 用户的在线时长,以秒为单位 - */ - public static final String RADIUS_SESSION_TIME = "radius_acct_session_time"; /** @@ -64,8 +48,8 @@ public class RadiusKnowledgeConfig { public static final String SINK_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.servers"); public static final String SINK_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.topic"); public static final String GROUP_ID = RadiusKnowledgeConfigurations.getStringProperty(0, "group.id"); - public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.ack"); - public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(1, "producer.ack"); + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(1, "producer.kafka.compression.type"); /** * default config diff --git a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java index 3027bfa..afd714a 100644 --- a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java +++ b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java @@ -2,6 +2,7 @@ package com.zdjizhi.topology; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zdjizhi.common.RadiusKnowledgeConfig; import com.zdjizhi.utils.functions.FilterNullFunction; import com.zdjizhi.utils.functions.MapCompletedFunction; import com.zdjizhi.utils.kafka.KafkaConsumer; @@ -24,10 +25,10 @@ public class RadiusKnowledgeTopology { DataStreamSource<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()); - DataStream<String> result = streamSource.map(new MapCompletedFunction()).name("RadiusOnOffMap") + DataStream<String> result = streamSource.map(new MapCompletedFunction()).name(RadiusKnowledgeConfig.SOURCE_KAFKA_TOPIC) .filter(new FilterNullFunction()).name("FilterAbnormalData"); - result.addSink(KafkaProducer.getKafkaProducer()).name("LogSink"); + result.addSink(KafkaProducer.getKafkaProducer()).name(RadiusKnowledgeConfig.SINK_KAFKA_TOPIC); try { environment.execute("RADIUS-ON-OFF"); @@ -37,5 +38,4 @@ public class RadiusKnowledgeTopology { } - } diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java index a875d0f..7131c08 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java @@ -1,8 +1,8 @@ package com.zdjizhi.utils.functions; +import cn.hutool.json.JSONObject; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.alibaba.fastjson.JSONObject; import com.zdjizhi.common.RadiusKnowledgeConfig; import com.zdjizhi.utils.StringUtil; import org.apache.flink.api.common.functions.MapFunction; @@ -21,17 +21,17 @@ public class MapCompletedFunction implements MapFunction<String, String> { String result = null; try { if (StringUtil.isNotBlank(logs)) { - JSONObject jsonMap = JSONObject.parseObject(logs); - if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) { - int packetType = jsonMap.getInteger(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE); + JSONObject json = new JSONObject(logs, false, true); + if (json.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) { + int packetType = json.getInt(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE); if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) { - result = GetKnowledgeLogs(jsonMap); + result = GetKnowledgeLogs(json); } } } } catch (RuntimeException e) { - logger.error("数据解析异常,异常信息:" + e); + logger.error("Radius log parsing exception,Message is :" + e); } return result; } @@ -44,42 +44,36 @@ public class MapCompletedFunction implements MapFunction<String, String> { */ private static String GetKnowledgeLogs(JSONObject jsonMap) { JSONObject knowledge = new JSONObject(); + String framedIp = jsonMap.getStr("radius_framed_ip"); + String account = jsonMap.getStr("radius_account"); - knowledge.put("framed_ip", jsonMap.getString("radius_framed_ip")); + if (StringUtil.isNotBlank(framedIp) && StringUtil.isNotBlank(account)) { + knowledge.set("framed_ip", framedIp); - knowledge.put("account", jsonMap.getString("radius_account")); + knowledge.set("account", account); - knowledge.put("acct_status_type", jsonMap.getString("radius_acct_status_type")); + //计费请求报文类型 + knowledge.set("acct_status_type", jsonMap.getStr("radius_acct_status_type")); - /* - 如果存在时间戳则选择此时间戳没有获取当前时间 - */ - if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) { - knowledge.put("event_timestamp", jsonMap.getString("radius_event_timestamp")); - } else { - knowledge.put("event_timestamp", (System.currentTimeMillis() / 1000)); - } + //如果存在时间戳则选择此时间戳没有获取当前时间 + knowledge.set("event_timestamp", jsonMap.getLong("radius_event_timestamp", System.currentTimeMillis() / 1000)); - /* - * 标识同一个连接: - * 1.数据若存在acct_multi_session_id属性,取该属性 - * 2.不存在取 acct_session_id - */ - if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_MULTI_SESSION_ID)) { - knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id")); - } else { - knowledge.put("acct_session_id", jsonMap.getString("radius_acct_multi_session_id")); - } + /* + * 标识同一个连接: + * 1.数据若存在acct_multi_session_id属性,取该属性 + * 2.不存在取 acct_session_id + */ + if (jsonMap.containsKey("radius_acct_multi_session_id")) { + knowledge.set("acct_session_id", jsonMap.getStr("radius_acct_multi_session_id")); + } else { + knowledge.set("acct_session_id", jsonMap.get("radius_acct_session_id")); + } - /* - 用户的在线时长,以秒为单位,下线用户无此属性,默认为0 - */ - if (jsonMap.containsKey(RadiusKnowledgeConfig.RADIUS_SESSION_TIME)) { - knowledge.put("acct_session_time", jsonMap.getString("radius_acct_session_time")); - } else { - knowledge.put("acct_session_time", 0); - } + //用户的在线时长,以秒为单位,下线用户无此属性,默认为0 + knowledge.set("acct_session_time", jsonMap.getLong("radius_acct_session_time", 0L)); - return knowledge.toJSONString(); + return knowledge.toString(); + } + return ""; } } diff --git a/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java index 7b7c046..aa5f234 100644 --- a/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java +++ b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java @@ -19,9 +19,9 @@ public final class RadiusKnowledgeConfigurations { public static String getStringProperty(Integer type, String key) { if (type == 0) { - return propService.getProperty(key); + return propService.getProperty(key).trim(); } else if (type == 1) { - return propDefault.getProperty(key); + return propDefault.getProperty(key).trim(); } else { return null; } @@ -30,9 +30,9 @@ public final class RadiusKnowledgeConfigurations { public static Integer getIntProperty(Integer type, String key) { if (type == 0) { - return Integer.parseInt(propService.getProperty(key)); + return Integer.parseInt(propService.getProperty(key).trim()); } else if (type == 1) { - return Integer.parseInt(propDefault.getProperty(key)); + return Integer.parseInt(propDefault.getProperty(key).trim()); } else { return null; } @@ -40,9 +40,9 @@ public final class RadiusKnowledgeConfigurations { public static Long getLongProperty(Integer type, String key) { if (type == 0) { - return Long.parseLong(propService.getProperty(key)); + return Long.parseLong(propService.getProperty(key).trim()); } else if (type == 1) { - return Long.parseLong(propDefault.getProperty(key)); + return Long.parseLong(propDefault.getProperty(key).trim()); } else { return null; } diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java deleted file mode 100644 index f1bba26..0000000 --- a/src/test/java/com/zdjizhi/FunctionTest.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.zdjizhi; - -import com.alibaba.fastjson.JSONObject; -import com.jayway.jsonpath.Configuration; -import com.jayway.jsonpath.JsonPath; -import org.junit.Test; - -import java.util.Map; - -/** - * @author qidaijie - * @Package com.zdjizhi - * @Description: - * @date 2021/6/3011:46 - */ -public class FunctionTest { - - @Test - public void jsonTest(){ - String logs = "{\"common_schema_type\":\"HTTP\",\"common_sessions\":1,\"http_request_line\":\"POST /portmap HTTP/1.1\",\"http_request_content_type\":\"application/json\",\"http_request_content_length\":21,\"http_user_agent\":\"Jakarta Commons-HttpClient/3.1\",\"http_host\":\"portmap.pro.testin.cn:8888\",\"http_url\":\"portmap.pro.testin.cn:8888/portmap\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_response_content_length\":286,\"http_isn\":2504229113,\"http_proxy_flag\":0,\"http_version\":\"http1\",\"http_response_latency_ms\":2,\"http_session_duration_ms\":3,\"http_sequence\":1,\"common_protocol_label\":\"ETHERNET.VLAN.IPv4.TCP\",\"common_c2s_byte_diff\":555,\"common_c2s_pkt_diff\":5,\"common_s2c_byte_diff\":684,\"common_s2c_pkt_diff\":4,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_first_ttl\":64,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":0,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":0,\"common_s2c_byte_retrans\":0,\"common_direction\":69,\"common_l7_protocol\":\"HTTP\",\"common_tcp_client_isn\":2504229113,\"common_tcp_server_isn\":468823374,\"common_server_ip\":\"192.168.42.1\",\"common_client_ip\":\"192.168.56.14\",\"common_server_port\":8888,\"common_client_port\":51118,\"common_stream_dir\":3,\"common_address_type\":4,\"common_address_list\":\"51118-8888-192.168.56.14-192.168.42.1\",\"common_start_time\":1652247928,\"common_end_time\":1652247928,\"common_con_duration_ms\":4,\"common_s2c_pkt_num\":4,\"common_s2c_byte_num\":684,\"common_c2s_pkt_num\":5,\"common_c2s_byte_num\":555,\"common_establish_latency_ms\":2,\"http_action_file_size\":0,\"common_tunnels\":[{\"tunnels_schema_type\":\"VLAN\",\"vlan_c2s_direction_id\":[56],\"vlan_s2c_direction_id\":[56]},{\"tunnels_schema_type\":\"MULTIPATH_ETHERNET\",\"c2s_source_mac\":\"00:0e:c6:6f:dd:12\",\"c2s_destination_mac\":\"e8:1c:ba:cc:87:17\",\"s2c_source_mac\":\"e8:1c:ba:cc:87:17\",\"s2c_destination_mac\":\"00:0e:c6:6f:dd:12\"}],\"common_stream_trace_id\":\"869433624695175123\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.81\",\"common_device_id\":\"21426003\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-three\\\"}]}\",\"common_policy_id\":0,\"common_service\":2,\"common_action\":0,\"common_link_info_c2s\":{\"functional_ip\":\"192.168.40.81\",\"thread_count\":43,\"relative_location\":\"internal\",\"hash_dist\":2,\"hash_algo\":0,\"linkinfo\":[{\"type\":\"ethernet\",\"src\":\"90:00:00:91:40:45\",\"dst\":\"0a:0a:0a:0a:00:0c\"},{\"type\":\"tuple4v4\",\"sip\":\"10.252.21.1\",\"dip\":\"10.10.0.12\",\"sport\":54789,\"dport\":4789},{\"type\":\"vxlan\",\"vlanid\":0,\"dir\":0,\"linkid\":22,\"linktype\":0},{\"type\":\"ethernet\",\"src\":\"00:0e:c6:6f:dd:12\",\"dst\":\"e8:1c:ba:cc:87:17\"},{\"type\":\"vlan\",\"vlan_id_array\":[56]}]},\"common_link_info_s2c\":{\"functional_ip\":\"192.168.40.81\",\"thread_count\":43,\"relative_location\":\"external\",\"hash_dist\":2,\"hash_algo\":0,\"linkinfo\":[{\"type\":\"ethernet\",\"src\":\"90:00:00:91:40:45\",\"dst\":\"0a:0a:0a:0a:00:0c\"},{\"type\":\"tuple4v4\",\"sip\":\"10.252.21.1\",\"dip\":\"10.10.0.12\",\"sport\":54789,\"dport\":4789},{\"type\":\"vxlan\",\"vlanid\":0,\"dir\":1,\"linkid\":22,\"linktype\":0},{\"type\":\"ethernet\",\"src\":\"e8:1c:ba:cc:87:17\",\"dst\":\"00:0e:c6:6f:dd:12\"},{\"type\":\"vlan\",\"vlan_id_array\":[56]}]}}\n"; - Object document = Configuration.defaultConfiguration().jsonProvider().parse(logs); - System.out.println(JsonPath.read(document, "$.name").toString()); - JSONObject jsonObject = JSONObject.parseObject(logs); - System.out.println(jsonObject.getString("common_schema_type")); - - } -} |
