diff options
| author | qidaijie <[email protected]> | 2021-09-27 11:15:46 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2021-09-27 11:15:46 +0800 |
| commit | beeff0a9e03d3016caa4bb0aa5327b8a6ee0b8a0 (patch) | |
| tree | 5ea7bec72591d645ad6b2f6ba7c23dee31756bfe | |
| parent | 8e93eb3d028d331a22bdbb246f6d21433f7a1259 (diff) | |
提交2109版本
| -rw-r--r-- | pom.xml | 223 | ||||
| -rw-r--r-- | properties/default_config.properties | 29 | ||||
| -rw-r--r-- | properties/service_flow_config.properties | 28 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/bean/RadiusKnowledge.java | 62 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java | 76 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java | 46 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java | 40 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java | 17 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java | 55 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/CertUtils.java | 36 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/Consumer.java | 41 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/kafka/Producer.java | 48 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java | 70 | ||||
| -rw-r--r-- | src/main/log4j.properties | 25 | ||||
| -rw-r--r-- | src/test/java/com/zdjizhi/FunctionTest.java | 24 |
15 files changed, 820 insertions, 0 deletions
@@ -0,0 +1,223 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.zdjizhi</groupId> + <artifactId>radius-account-knowledge</artifactId> + <version>210908-security</version> + + <name>radius-account-knowledge</name> + <url>http://www.example.com</url> + + + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.40.125:8099/content/groups/public</url> + </repository> + + <repository> + <id>maven-ali</id> + <url>http://maven.aliyun.com/nexus/content/groups/public/</url> + <releases> + <!--<enabled>true</enabled>--> + </releases> + <snapshots> + <!--<enabled>true</enabled>--> + <checksumPolicy>fail</checksumPolicy> + </snapshots> + </repository> + </repositories> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flink.version>1.13.1</flink.version> + <hadoop.version>2.7.1</hadoop.version> + <kafka.version>1.0.0</kafka.version> + <hbase.version>2.2.3</hbase.version> + <!--<scope.type>provided</scope.type>--> + <scope.type>compile</scope.type> + </properties> + + <build> + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.2</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>com.zdjizhi.topology.RadiusKnowledgeTopology</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>io.github.zlika</groupId> + <artifactId>reproducible-build-maven-plugin</artifactId> + <version>0.2</version> + <executions> + <execution> + <goals> + <goal>strip-jar</goal> + </goals> + <phase>package</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + <include>**/*.xml</include> + </includes> + <filtering>false</filtering> + </resource> + + <resource> + <directory>src\main\java</directory> + <includes> + <include>log4j.properties</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> + + <dependencies> + + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> + <version>1.0.6</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.70</version> + </dependency> + + <!--<!– https://mvnrepository.com/artifact/org.apache.flink/flink-table –>--> + <!--<dependency>--> + <!--<groupId>org.apache.flink</groupId>--> + <!--<artifactId>flink-table</artifactId>--> + <!--<version>${flink.version}</version>--> + <!--<type>pom</type>--> + <!--<scope>${scope.type}</scope>--> + <!--</dependency>--> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.12</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.12</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_2.12</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + <dependency> + <groupId>cglib</groupId> + <artifactId>cglib-nodep</artifactId> + <version>3.2.4</version> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>5.3.2</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + <version>2.4.0</version> + </dependency> + + + <dependency> + <groupId>cn.hutool</groupId> + <artifactId>hutool-all</artifactId> + <version>5.5.2</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + + </dependencies> +</project> + diff --git a/properties/default_config.properties b/properties/default_config.properties new file mode 100644 index 0000000..021eebd --- /dev/null +++ b/properties/default_config.properties @@ -0,0 +1,29 @@ +#producer���ԵĴ������� +retries=0 + +#���ĺ������˵һ��Batch������֮��������ã��������Batch��û��д���������뷢�ͳ�ȥ�� +linger.ms=5 + +#����ڳ�ʱ֮ǰδ�յ���Ӧ���ͻ��˽��ڱ�Ҫʱ���·������� +request.timeout.ms=30000 + +#producer���ǰ���batch���з��͵�,���δ�С��Ĭ��:16384 +batch.size=262144 + +#Producer�����ڻ�����Ϣ�Ļ�������С +buffer.memory=67108864 + +#�������������ÿ�η���Kafka���������������С,Ĭ��1048576 +max.request.size=5242880 + +#kafka SASL��֤�û��� +kafka.user=admin + +#kafka SASL��SSL��֤���� +kafka.pin=galaxy2019 + +#kafka sink protocol; SSL or SASL +kafka.source.protocol=SASL + +#kafka sink protocol; SSL or SASL +kafka.sink.protocol=SASL
\ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties new file mode 100644 index 0000000..eb2cb4c --- /dev/null +++ b/properties/service_flow_config.properties @@ -0,0 +1,28 @@ +#--------------------------------地址配置------------------------------# + +#管理kafka地址 +input.kafka.servers=192.168.44.12:9092 + +#管理输出kafka地址 +output.kafka.servers=192.168.44.12:9092 + +#--------------------------------Kafka消费组信息------------------------------# + +#kafka 接收数据topic +input.kafka.topic=RADIUS-RECORD + +#补全数据 输出 topic +output.kafka.topic=RADIUS-ONFF-LOG + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +group.id=radius-on-off-flink-20210615 + +#生产者压缩模式 none or snappy +producer.kafka.compression.type=none + +#生产者ack +producer.ack=1 + +#--------------------------------topology配置------------------------------# +#第三方工具地址 +tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java b/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java new file mode 100644 index 0000000..43e71a1 --- /dev/null +++ b/src/main/java/com/zdjizhi/bean/RadiusKnowledge.java @@ -0,0 +1,62 @@ +package com.zdjizhi.bean; + +/** + * @author qidaijie + */ +public class RadiusKnowledge { + + private String framed_ip; + private String account; + private String acct_session_id; + private long acct_status_type; + private long acct_session_time; + private long event_timestamp; + + public String getFramed_ip() { + return framed_ip; + } + + public void setFramed_ip(String framed_ip) { + this.framed_ip = framed_ip; + } + + public String getAccount() { + return account; + } + + public void setAccount(String account) { + this.account = account; + } + + public String getAcct_session_id() { + return acct_session_id; + } + + public void setAcct_session_id(String acct_session_id) { + this.acct_session_id = acct_session_id; + } + + public long getAcct_status_type() { + return acct_status_type; + } + + public void setAcct_status_type(long acct_status_type) { + this.acct_status_type = acct_status_type; + } + + public long getAcct_session_time() { + return acct_session_time; + } + + public void setAcct_session_time(long acct_session_time) { + this.acct_session_time = acct_session_time; + } + + public long getEvent_timestamp() { + return event_timestamp; + } + + public void setEvent_timestamp(long event_timestamp) { + this.event_timestamp = event_timestamp; + } +} diff --git a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java new file mode 100644 index 0000000..d267b6b --- /dev/null +++ b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java @@ -0,0 +1,76 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.RadiusKnowledgeConfigurations; + +/** + * @author Administrator + */ +public class RadiusKnowledgeConfig { + /** + * 4- Accounting-Request(账户授权) + */ + public static final int ACCOUNTING_REQUEST = 4; + /** + * 1、开始计费 + */ + public static final int START_BILLING = 1; + /** + * 2、停止计费 + */ + public static final int STOP_BILLING = 2; + /** + * 报文类型 + */ + public static final String RADIUS_PACKET_TYPE = "radius_packet_type"; + /** + * 计费请求报文类型 + */ + public static final String RADIUS_ACCT_STATUS_TYPE = "radius_acct_status_type"; + /** + * 发送计费请求报文时间戳 + */ + public static final String RADIUS_EVENT_TIMESTAMP = "radius_event_timestamp"; + + /** + * 一个用户多个计费ID关联属性 + */ + public static final String RADIUS_MULTI_SESSION_ID = "radius_acct_multi_session_id"; + + /** + * 用户的在线时长,以秒为单位 + */ + public static final String RADIUS_SESSION_TIME = "radius_acct_session_time"; + + + /** + * System + */ + + /** + * kafka + */ + public static final String INPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.servers"); + public static final String OUTPUT_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.servers"); + public static final String GROUP_ID = RadiusKnowledgeConfigurations.getStringProperty(0, "group.id"); + public static final String OUTPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "output.kafka.topic"); + public static final String INPUT_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "input.kafka.topic"); + public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.ack"); + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + + /** + * default config + */ + public static final String RETRIES = RadiusKnowledgeConfigurations.getStringProperty(1, "retries"); + public static final String LINGER_MS = RadiusKnowledgeConfigurations.getStringProperty(1, "linger.ms"); + public static final Integer REQUEST_TIMEOUT_MS = RadiusKnowledgeConfigurations.getIntProperty(1, "request.timeout.ms"); + public static final Integer BATCH_SIZE = RadiusKnowledgeConfigurations.getIntProperty(1, "batch.size"); + public static final Integer BUFFER_MEMORY = RadiusKnowledgeConfigurations.getIntProperty(1, "buffer.memory"); + public static final Integer MAX_REQUEST_SIZE = RadiusKnowledgeConfigurations.getIntProperty(1, "max.request.size"); + public static final String TOOLS_LIBRARY = RadiusKnowledgeConfigurations.getStringProperty(0, "tools.library"); + public static final String KAFKA_SOURCE_PROTOCOL = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.source.protocol"); + public static final String KAFKA_SINK_PROTOCOL = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.sink.protocol"); + public static final String KAFKA_USER = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.user"); + public static final String KAFKA_PIN = RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.pin"); + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java new file mode 100644 index 0000000..77418a6 --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java @@ -0,0 +1,46 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.RadiusKnowledgeConfig; +import com.zdjizhi.utils.functions.FilterLogFunction; +import com.zdjizhi.utils.functions.FilterNullFunction; +import com.zdjizhi.utils.functions.MapCompletedFunction; +import com.zdjizhi.utils.kafka.Consumer; +import com.zdjizhi.utils.kafka.Producer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * @author qidaijie + * @Package com.zdjizhi.topology + * @Description: + * @date 2021/5/2016:42 + */ +public class RadiusKnowledgeTopology { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + +// environment.enableCheckpointing(5000); + + DataStreamSource<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()); + + DataStream<String> result = streamSource.filter(new FilterLogFunction()).name("FilterOriginalData") + .map(new MapCompletedFunction()).name("RadiusOnOffMap") + .filter(new FilterNullFunction()).name("FilterAbnormalData"); + + result.addSink(Producer.getKafkaProducer()).name("LogSink"); + + try { + environment.execute("RADIUS-ON-OFF"); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :" + e); + } + + } + + +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java new file mode 100644 index 0000000..723ceaa --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FilterLogFunction.java @@ -0,0 +1,40 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.common.RadiusKnowledgeConfig; +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class FilterLogFunction implements FilterFunction<String> { + private static final Log logger = LogFactory.get(); + + @Override + public boolean filter(String message) { + boolean legitimate = false; + try { + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSONObject.parseObject(message); + if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) { + int packetType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE); + if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) { + int statusType = jsonObject.getInteger(RadiusKnowledgeConfig.RADIUS_ACCT_STATUS_TYPE); + if (RadiusKnowledgeConfig.START_BILLING == statusType || RadiusKnowledgeConfig.STOP_BILLING == statusType) { + legitimate = true; + } + } + } + } + } catch (RuntimeException re) { + logger.error("数据解析异常,异常信息:" + re); + } + return legitimate; + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java new file mode 100644 index 0000000..de507ad --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java @@ -0,0 +1,17 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class FilterNullFunction implements FilterFunction<String> { + @Override + public boolean filter(String message) { + return StringUtil.isNotBlank(message); + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java new file mode 100644 index 0000000..e58419c --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java @@ -0,0 +1,55 @@ +package com.zdjizhi.utils.functions; + +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.bean.RadiusKnowledge; +import com.zdjizhi.common.RadiusKnowledgeConfig; +import org.apache.flink.api.common.functions.MapFunction; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/5/2715:01 + */ +public class MapCompletedFunction implements MapFunction<String, String> { + + @Override + public String map(String logs) { + try { + JSONObject jsonObject = JSONObject.parseObject(logs); + RadiusKnowledge knowledge = new RadiusKnowledge(); + knowledge.setFramed_ip(jsonObject.getString("radius_framed_ip")); + knowledge.setAccount(jsonObject.getString("radius_account")); + knowledge.setAcct_status_type(jsonObject.getInteger("radius_acct_status_type")); + /* + 如果存在时间戳则选择此时间戳没有获取当前时间 + */ + if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_EVENT_TIMESTAMP)) { + knowledge.setEvent_timestamp(jsonObject.getLong("radius_event_timestamp")); + } else { + knowledge.setEvent_timestamp(System.currentTimeMillis() / 1000); + } + /* + * 标识同一个连接: + * 1.数据若存在acct_multi_session_id属性,取该属性 + * 2.不存在取 acct_session_id + */ + if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_MULTI_SESSION_ID)) { + knowledge.setAcct_session_id(jsonObject.getString("radius_acct_multi_session_id")); + } else { + knowledge.setAcct_session_id(jsonObject.getString("radius_acct_session_id")); + } + /* + 用户的在线时长,以秒为单位,下线用户无此属性,默认为0 + */ + if (jsonObject.containsKey(RadiusKnowledgeConfig.RADIUS_SESSION_TIME)) { + knowledge.setAcct_session_time(jsonObject.getInteger("radius_acct_session_time")); + } else { + knowledge.setAcct_session_time(0); + } + return JSONObject.toJSONString(knowledge); + } catch (RuntimeException e) { + return ""; + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java new file mode 100644 index 0000000..a325dd6 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java @@ -0,0 +1,36 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.RadiusKnowledgeConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/9/610:37 + */ +class CertUtils { + static void chooseCert(String type, Properties properties) { + switch (type) { + case "SSL": + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", RadiusKnowledgeConfig.KAFKA_PIN); + properties.put("ssl.truststore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", RadiusKnowledgeConfig.KAFKA_PIN); + properties.put("ssl.key.password", RadiusKnowledgeConfig.KAFKA_PIN); + break; + case "SASL": + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + RadiusKnowledgeConfig.KAFKA_USER + " password=" + RadiusKnowledgeConfig.KAFKA_PIN + ";"); + break; + default: + } + + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java new file mode 100644 index 0000000..7e75117 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -0,0 +1,41 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.RadiusKnowledgeConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/813:54 + */ +public class Consumer { + private static Properties createConsumerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", RadiusKnowledgeConfig.INPUT_KAFKA_SERVERS); + properties.put("group.id", RadiusKnowledgeConfig.GROUP_ID); + properties.put("session.timeout.ms", "60000"); + properties.put("max.poll.records", 3000); + properties.put("max.partition.fetch.bytes", 31457280); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SOURCE_PROTOCOL,properties); + + return properties; + } + + public static FlinkKafkaConsumer<String> getKafkaConsumer() { + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(RadiusKnowledgeConfig.INPUT_KAFKA_TOPIC, + new SimpleStringSchema(), createConsumerConfig()); + + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java new file mode 100644 index 0000000..ce8fab3 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java @@ -0,0 +1,48 @@ +package com.zdjizhi.utils.kafka; + +import com.zdjizhi.common.RadiusKnowledgeConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Optional; +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/814:04 + */ +public class Producer { + + private static Properties createProducerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", RadiusKnowledgeConfig.OUTPUT_KAFKA_SERVERS); + properties.put("acks", RadiusKnowledgeConfig.PRODUCER_ACK); + properties.put("retries", RadiusKnowledgeConfig.RETRIES); + properties.put("linger.ms", RadiusKnowledgeConfig.LINGER_MS); + properties.put("request.timeout.ms", RadiusKnowledgeConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", RadiusKnowledgeConfig.BATCH_SIZE); + properties.put("buffer.memory", RadiusKnowledgeConfig.BUFFER_MEMORY); + properties.put("max.request.size", RadiusKnowledgeConfig.MAX_REQUEST_SIZE); + properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, RadiusKnowledgeConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + + CertUtils.chooseCert(RadiusKnowledgeConfig.KAFKA_SINK_PROTOCOL, properties); + + return properties; + } + + + public static FlinkKafkaProducer<String> getKafkaProducer() { + FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( + RadiusKnowledgeConfig.OUTPUT_KAFKA_TOPIC, + new SimpleStringSchema(), + createProducerConfig(), Optional.empty()); + + kafkaProducer.setLogFailuresOnly(false); +// kafkaProducer.setWriteTimestampToKafka(true); + + return kafkaProducer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java new file mode 100644 index 0000000..7b7c046 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java @@ -0,0 +1,70 @@ +package com.zdjizhi.utils.system; + +import com.zdjizhi.utils.StringUtil; + +import java.io.IOException; +import java.util.Locale; +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class RadiusKnowledgeConfigurations { + + private static Properties propDefault = new Properties(); + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key); + } else if (type == 1) { + return propDefault.getProperty(key); + } else { + return null; + } + + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key)); + } else if (type == 1) { + return Integer.parseInt(propDefault.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key)); + } else if (type == 1) { + return Long.parseLong(propDefault.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else if (type == 1) { + return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else { + return null; + } + } + + static { + try { + propService.load(RadiusKnowledgeConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propDefault.load(RadiusKnowledgeConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + } catch (IOException | RuntimeException e) { + propDefault = null; + propService = null; + } + } +} diff --git a/src/main/log4j.properties b/src/main/log4j.properties new file mode 100644 index 0000000..9d91936 --- /dev/null +++ b/src/main/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=info,console,file +# 控制台日志设置 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 文件日志设置 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#路径请用相对路径,做好相关测试输出到应用目下 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 +log4j.logger.com.nis.web.dao=debug +#bonecp数据源配置 +log4j.category.com.jolbox=debug,console + + diff --git a/src/test/java/com/zdjizhi/FunctionTest.java b/src/test/java/com/zdjizhi/FunctionTest.java new file mode 100644 index 0000000..6b52828 --- /dev/null +++ b/src/test/java/com/zdjizhi/FunctionTest.java @@ -0,0 +1,24 @@ +package com.zdjizhi; + +import com.alibaba.fastjson.JSONObject; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.JsonPath; +import org.junit.Test; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2021/6/3011:46 + */ +public class FunctionTest { + + @Test + public void jsonTest(){ + String logs = "{\"name\":\"test\"}"; + Object document = Configuration.defaultConfiguration().jsonProvider().parse(logs); + System.out.println(JsonPath.read(document, "$.name").toString()); + JSONObject jsonObject = JSONObject.parseObject(logs); + System.out.println(jsonObject.getLong("age")); + } +} |
