From beeff0a9e03d3016caa4bb0aa5327b8a6ee0b8a0 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Mon, 27 Sep 2021 11:15:46 +0800 Subject: 鎻愪氦2109鐗堟湰 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 223 +++++++++++++++++++++ properties/default_config.properties | 29 +++ properties/service_flow_config.properties | 28 +++ .../java/com/zdjizhi/bean/RadiusKnowledge.java | 62 ++++++ .../com/zdjizhi/common/RadiusKnowledgeConfig.java | 76 +++++++ .../zdjizhi/topology/RadiusKnowledgeTopology.java | 46 +++++ .../zdjizhi/utils/functions/FilterLogFunction.java | 40 ++++ .../utils/functions/FilterNullFunction.java | 17 ++ .../utils/functions/MapCompletedFunction.java | 55 +++++ .../java/com/zdjizhi/utils/kafka/CertUtils.java | 36 ++++ .../java/com/zdjizhi/utils/kafka/Consumer.java | 41 ++++ .../java/com/zdjizhi/utils/kafka/Producer.java | 48 +++++ .../system/RadiusKnowledgeConfigurations.java | 70 +++++++ src/main/log4j.properties | 25 +++ src/test/java/com/zdjizhi/FunctionTest.java | 24 +++ 15 files changed, 820 insertions(+) create mode 100644 pom.xml create mode 100644 properties/default_config.properties create mode 100644 properties/service_flow_config.properties create mode 100644 src/main/java/com/zdjizhi/bean/RadiusKnowledge.java create mode 100644 src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java create mode 100644 src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java create mode 100644 src/main/java/com/zdjizhi/utils/kafka/CertUtils.java create mode 100644 src/main/java/com/zdjizhi/utils/kafka/Consumer.java create mode 100644 src/main/java/com/zdjizhi/utils/kafka/Producer.java create mode 100644 src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java create mode 100644 src/main/log4j.properties create mode 100644 src/test/java/com/zdjizhi/FunctionTest.java diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..1062c03 --- /dev/null +++ b/pom.xml @@ -0,0 +1,223 @@ + + + + 4.0.0 + + com.zdjizhi + radius-account-knowledge + 210908-security + + radius-account-knowledge + http://www.example.com + + + + + nexus + Team Nexus Repository + http://192.168.40.125:8099/content/groups/public + + + + maven-ali + http://maven.aliyun.com/nexus/content/groups/public/ + + + + + + fail + + + + + + UTF-8 + 1.13.1 + 2.7.1 + 1.0.0 + 2.2.3 + + compile + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.2 + + + package + + shade + + + + + com.zdjizhi.topology.RadiusKnowledgeTopology + + + + + + + + + io.github.zlika + reproducible-build-maven-plugin + 0.2 + + + + strip-jar + + package + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + + + properties + + **/*.properties + **/*.xml + + false + + + + src\main\java + + log4j.properties + + false + + + + + + + + com.zdjizhi + galaxy + 1.0.6 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + + + + com.alibaba + fastjson + 1.2.70 + + + + + + + + + + + + + + org.apache.flink + flink-core + ${flink.version} + ${scope.type} + + + + + + org.apache.flink + flink-streaming-java_2.12 + ${flink.version} + ${scope.type} + + + + + org.apache.flink + flink-clients_2.12 + ${flink.version} + ${scope.type} + + + + + org.apache.flink + flink-connector-kafka_2.12 + ${flink.version} + ${scope.type} + + + + + org.apache.flink + flink-java + ${flink.version} + ${scope.type} + + + + cglib + cglib-nodep + 3.2.4 + + + + org.junit.jupiter + junit-jupiter-api + 5.3.2 + compile + + + + com.jayway.jsonpath + json-path + 2.4.0 + + + + + cn.hutool + hutool-all + 5.5.2 + + + + junit + junit + 4.12 + test + + + + + diff --git a/properties/default_config.properties b/properties/default_config.properties new file mode 100644 index 0000000..021eebd --- /dev/null +++ b/properties/default_config.properties @@ -0,0 +1,29 @@ +#producer重试的次数设置 +retries=0 + +#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了 +linger.ms=5 + +#如果在超时之前未收到响应,客户端将在必要时重新发送请求 +request.timeout.ms=30000 + +#producer都是按照batch进行发送的,批次大小,默认:16384 +batch.size=262144 + +#Producer端用于缓存消息的缓冲区大小 +buffer.memory=67108864 + +#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 +max.request.size=5242880 + +#kafka SASL验证用户名 +kafka.user=admin + +#kafka SASL及SSL验证密码 +kafka.pin=galaxy2019 + +#kafka sink protocol; SSL or SASL +kafka.source.protocol=SASL + +#kafka sink protocol; SSL or SASL +kafka.sink.protocol=SASL \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties new file mode 100644 index 0000000..eb2cb4c --- /dev/null +++ b/properties/service_flow_config.properties @@ -0,0 +1,28 @@ +#--------------------------------鍦板潃閰嶇疆------------------------------# + +#绠$悊kafka鍦板潃 +input.kafka.servers=192.168.44.12:9092 + +#绠$悊杈撳嚭kafka鍦板潃 +output.kafka.servers=192.168.44.12:9092 + +#--------------------------------Kafka娑堣垂缁勪俊鎭------------------------------# + +#kafka 鎺ユ敹鏁版嵁topic +input.kafka.topic=RADIUS-RECORD + +#琛ュ叏鏁版嵁 杈撳嚭 topic +output.kafka.topic=RADIUS-ONFF-LOG + +#璇诲彇topic,瀛樺偍璇pout id鐨勬秷璐筼ffset淇℃伅锛屽彲閫氳繃璇ユ嫇鎵戝懡鍚;鍏蜂綋瀛樺偍offset鐨勪綅缃紝纭畾涓嬫璇诲彇涓嶉噸澶嶇殑鏁版嵁锛 +group.id=radius-on-off-flink-20210615 + +#鐢熶骇鑰呭帇缂╂ā寮 none or snappy +producer.kafka.compression.type=none + +#鐢熶骇鑰卆ck +producer.ack=1 + +#--------------------------------topology閰嶇疆------------------------------# +#绗笁鏂瑰伐鍏峰湴鍧 +tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java b/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java new file mode 100644 index 0000000..43e71a1 --- /dev/null +++ b/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java @@ -0,0 +1,62 @@ +package com.zdjizhi.bean; + +/** + * @author qidaijie + */ +public class RadiusKnowledge { + + private String framed_ip; + private String account; + private String acct_session_id; + private long acct_status_type; + private long acct_session_time; + private long event_timestamp; + + public String getFramed_ip() { + return framed_ip; + } + + public void setFramed_ip(String framed_ip) { + this.framed_ip = framed_ip; + } + + public String getAccount() { + return account; + } + + public void setAccount(String account) { + this.account = account; + } + + public String getAcct_session_id() { + return acct_session_id; + } + + public void setAcct_session_id(String acct_session_id) { + this.acct_session_id = acct_session_id; + } + + public long getAcct_status_type() { + return acct_status_type; + } + + public void setAcct_status_type(long acct_status_type) { + this.acct_status_type = acct_status_type; + } + + public long getAcct_session_time() { + return acct_session_time; + } + + public void setAcct_session_time(long acct_session_time) { + this.acct_session_time = acct_session_time; + } + + public long getEvent_timestamp() { + return event_timestamp; + } + + public void setEvent_timestamp(long event_timestamp) { + this.event_timestamp = event_timestamp; + } +} diff --git a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java new file mode 100644 index 0000000..d267b6b --- /dev/null +++ b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java @@ -0,0 +1,76 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.RadiusKnowledgeConfigurations; + +/** + * @author Administrator + */ +public class RadiusKnowledgeConfig { + /** + * 4- Accounting-Request(璐︽埛鎺堟潈) + */ + public static final int ACCOUNTING_REQUEST = 4; + /** + * 1銆佸紑濮嬭璐 + */ + public static final int START_BILLING = 1; + /** + * 2銆佸仠姝㈣璐 + */ + public static final int STOP_BILLING = 2; + /** + * 鎶ユ枃绫诲瀷 + */ + public static final String RADIUS_PACKET_TYPE = "radius_packet_type"; + /** + * 璁¤垂璇锋眰鎶ユ枃绫诲瀷 + */ + public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type"; + /** + * 鍙戦佽璐硅姹傛姤鏂囨椂闂存埑 + */ + public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp"; + + /** + * 涓涓敤鎴峰涓璐笽D鍏宠仈灞炴 + */ + public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id"; + + /** + * 鐢ㄦ埛鐨勫湪绾挎椂闀匡紝浠ョ涓哄崟浣 + */ + public static final String RADIUS_SESSION_TIME = "radius_acct_session_time"; + + + /** + * System + */ + + /** + * kafka + */ + public static final String INPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.servers"); + public static final String OUTPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.servers"); + public static final String GROUP_ID = RadiusKnowledgeConfigurations.getStringProperty(0, "group.id"); + public static final String OUTPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.topic"); + public static final String INPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.topic"); + public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.ack"); + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + + /** + * default config + */ + public static final String RETRIES = RadiusKnowledgeConfigurations.getStringProperty(1, "retries"); + public static final String LINGER_MS = RadiusKnowledgeConfigurations.getStringProperty(1, "linger.ms"); + public static final Integer REQUEST_TIMEOUT_MS = RadiusKnowledgeConfigurations.getIntProperty(1, "request.timeout.ms"); + public static final Integer BATCH_SIZE = RadiusKnowledgeConfigurations.getIntProperty(1, "batch.size"); + public static final Integer BUFFER_MEMORY = RadiusKnowledgeConfigurations.getIntProperty(1, "buffer.memory"); + public static final Integer MAX_REQUEST_SIZE = RadiusKnowledgeConfigurations.getIntProperty(1, "max.request.size"); + public static final String TOOLS_LIBRARY = RadiusKnowledgeConfigurations.getStringProperty(0, "tools.library"); + public static final String KAFKA_SOURCE_PROTOCOL = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.source.protocol"); + public static final String KAFKA_SINK_PROTOCOL = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.sink.protocol"); + public static final String KAFKA_USER = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.user"); + public static final String KAFKA_PIN = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.pin"); + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java new file mode 100644 index 0000000..77418a6 --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java @@ -0,0 +1,46 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.RadiusKnowledgeConfig; +import com.zdjizhi.utils.functions.FilterLogFunction; +import com.zdjizhi.utils.functions.FilterNullFunction; +import com.zdjizhi.utils.functions.MapCompletedFunction; +import com.zdjizhi.utils.kafka.Consumer; +import com.zdjizhi.utils.kafka.Producer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * @author qidaijie + * @Package com.zdjizhi.topology + * @Description: + * @date 2021/5/2016:42 + */ +public class RadiusKnowledgeTopology { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + +// environment.enableCheckpointing(5000); + + DataStreamSource streamSource = environment.addSource(Consumer.getKafkaConsumer()); + + DataStream result = streamSource.filter(new FilterLogFunction()).name("FilterOriginalData") + .map(new MapCompletedFunction()).name("RadiusOnOffMap") + .filter(new FilterNullFunction()).name("FilterAbnormalData"); + + result.addSink(Producer.getKafkaProducer()).name("LogSink"); + + try { + environment.execute("RADIUS-ON-OFF"); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :" + e); + } + + } + + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java new file mode 100644 index 0000000..723ceaa --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java @@ -0,0 +1,40 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.RadiusKnowledgeConfig; +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class FilterLogFunction implements FilterFunction { + private static final Log logger = LogFactory.get(); + + @Override + public boolean filter(String message) { + boolean legitimate = false; + try { + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSONObject.parseObject(message); + if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) { + int packetType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE); + if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) { + int statusType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_ACCT_STATUS_TYPE); + if (RadiusKnowledgeConfig.START_BILLING == statusType || RadiusKnowledgeConfig.STOP_BILLING == statusType) { + legitimate = true; + } + } + } + } + } catch (RuntimeException re) { + logger.error("鏁版嵁瑙f瀽寮傚父,寮傚父淇℃伅:" + re); + } + return legitimate; + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java new file mode 100644 index 0000000..de507ad --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java @@ -0,0 +1,17 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class FilterNullFunction implements FilterFunction { + @Override + public boolean filter(String message) { + return StringUtil.isNotBlank(message); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java new file mode 100644 index 0000000..e58419c --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java @@ -0,0 +1,55 @@ +package com.zdjizhi.utils.functions; + +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.bean.RadiusKnowledge; +import com.zdjizhi.common.RadiusKnowledgeConfig; +import org.apache.flink.api.common.functions.MapFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class MapCompletedFunction implements MapFunction { + + @Override + public String map(String logs) { + try { + JSONObject jsonObject = JSONObject.parseObject(logs); + RadiusKnowledge knowledge = new RadiusKnowledge(); + knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip")); + knowledge.setAccount(jsonObject.getString("radius_account")); + knowledge.setAcct_status_type(jsonObject.getInteger("radius_acct_status_type")); + /* + 濡傛灉瀛樺湪鏃堕棿鎴冲垯閫夋嫨姝ゆ椂闂存埑娌℃湁鑾峰彇褰撳墠鏃堕棿 + */ + if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) { + knowledge.setEvent_timestamp(jsonObject.getLong("radius_event_timestamp")); + } else { + knowledge.setEvent_timestamp(System.currentTimeMillis() / 1000); + } + /* + * 鏍囪瘑鍚屼竴涓繛鎺ワ細 + * 1.鏁版嵁鑻ュ瓨鍦╝cct_multi_session_id灞炴э紝鍙栬灞炴 + * 2.涓嶅瓨鍦ㄥ彇 acct_session_id + */ + if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_MULTI_SESSION_ID)) { + knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id")); + } else { + knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id")); + } + /* + 鐢ㄦ埛鐨勫湪绾挎椂闀匡紝浠ョ涓哄崟浣,涓嬬嚎鐢ㄦ埛鏃犳灞炴э紝榛樿涓0 + */ + if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_SESSION_TIME)) { + knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time")); + } else { + knowledge.setAcct_session_time(0); + } + return JSONObject.toJSONString(knowledge); + } catch (RuntimeException e) { + return ""; + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java new file mode 100644 index 0000000..a325dd6 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -0,0 +1,36 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.RadiusKnowledgeConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/9/610:37 + */ +class CertUtils { + static void chooseCert(String type, Properties properties) { + switch (type) { + case "SSL": + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", RadiusKnowledgeConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", RadiusKnowledgeConfig.KAFKA_PIN); + properties.put("ssl.key.password", RadiusKnowledgeConfig.KAFKA_PIN); + break; + case "SASL": + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + RadiusKnowledgeConfig.KAFKA_USER + " password=" + RadiusKnowledgeConfig.KAFKA_PIN + ";"); + break; + default: + } + + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java new file mode 100644 index 0000000..7e75117 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -0,0 +1,41 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.RadiusKnowledgeConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/813:54 + */ +public class Consumer { + private static Properties createConsumerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", RadiusKnowledgeConfig.INPUT_KAFKA_SERVERS); + properties.put("group.id", RadiusKnowledgeConfig.GROUP_ID); + properties.put("session.timeout.ms", "60000"); + properties.put("max.poll.records", 3000); + properties.put("max.partition.fetch.bytes", 31457280); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SOURCE_PROTOCOL,properties); + + return properties; + } + + public static FlinkKafkaConsumer getKafkaConsumer() { + FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(RadiusKnowledgeConfig.INPUT_KAFKA_TOPIC, + new SimpleStringSchema(), createConsumerConfig()); + + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java new file mode 100644 index 0000000..ce8fab3 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java @@ -0,0 +1,48 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.RadiusKnowledgeConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Optional; +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/814:04 + */ +public class Producer { + + private static Properties createProducerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", RadiusKnowledgeConfig.OUTPUT_KAFKA_SERVERS); + properties.put("acks", RadiusKnowledgeConfig.PRODUCER_ACK); + properties.put("retries", RadiusKnowledgeConfig.RETRIES); + properties.put("linger.ms", RadiusKnowledgeConfig.LINGER_MS); + properties.put("request.timeout.ms", RadiusKnowledgeConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", RadiusKnowledgeConfig.BATCH_SIZE); + properties.put("buffer.memory", RadiusKnowledgeConfig.BUFFER_MEMORY); + properties.put("max.request.size", RadiusKnowledgeConfig.MAX_REQUEST_SIZE); + properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, RadiusKnowledgeConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + + CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SINK_PROTOCOL, properties); + + return properties; + } + + + public static FlinkKafkaProducer getKafkaProducer() { + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( + RadiusKnowledgeConfig.OUTPUT_KAFKA_TOPIC, + new SimpleStringSchema(), + createProducerConfig(), Optional.empty()); + + kafkaProducer.setLogFailuresOnly(false); +// kafkaProducer.setWriteTimestampToKafka(true); + + return kafkaProducer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java new file mode 100644 index 0000000..7b7c046 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java @@ -0,0 +1,70 @@ +package com.zdjizhi.utils.system; + +import com.zdjizhi.utils.StringUtil; + +import java.io.IOException; +import java.util.Locale; +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class RadiusKnowledgeConfigurations { + + private static Properties propDefault = new Properties(); + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else if (type == 1) { + return propDefault.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else if (type == 1) { + return Integer.parseInt(propDefault.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else if (type == 1) { + return Long.parseLong(propDefault.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else if (type == 1) { + return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else { + return null; + } + } + + static { + try { + propService.load(RadiusKnowledgeConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propDefault.load(RadiusKnowledgeConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + } catch (IOException | RuntimeException e) { + propDefault = null; + propService = null; + } + } +} diff --git a/src/main/log4j.properties b/src/main/log4j.properties new file mode 100644 index 0000000..9d91936 --- /dev/null +++ b/src/main/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=info,console,file +# 鎺у埗鍙版棩蹇楄缃 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 鏂囦欢鏃ュ織璁剧疆 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#璺緞璇风敤鐩稿璺緞锛屽仛濂界浉鍏虫祴璇曡緭鍑哄埌搴旂敤鐩笅 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 閰嶇疆锛宑om.nis.web.dao鏄痬ybatis鎺ュ彛鎵鍦ㄥ寘 +log4j.logger.com.nis.web.dao=debug +#bonecp鏁版嵁婧愰厤缃 +log4j.category.com.jolbox=debug,console + + diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java new file mode 100644 index 0000000..6b52828 --- /dev/null +++ b/src/test/java/com/zdjizhi/FunctionTest.java @@ -0,0 +1,24 @@ +package com.zdjizhi; + +import com.alibaba.fastjson.JSONObject; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.JsonPath; +import org.junit.Test; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/6/3011:46 + */ +public class FunctionTest { + + @Test + public void jsonTest(){ + String logs = "{\"name\":\"test\"}"; + Object document = Configuration.defaultConfiguration().jsonProvider().parse(logs); + System.out.println(JsonPath.read(document, "$.name").toString()); + JSONObject jsonObject = JSONObject.parseObject(logs); + System.out.println(jsonObject.getLong("age")); + } +} -- cgit v1.2.3