summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2023-11-16 11:14:19 +0800
committerqidaijie <[email protected]>2023-11-16 11:14:19 +0800
commit8214c9255a85d276731c0ccdb7018467af335292 (patch)
tree96579242ef006a40d8d01a2a571014d9b31cfff6
parent686a07bbb107b53cb5682aae7423e9d019e59830 (diff)
优化配置文件加载方式feature/23.11
-rw-r--r--pom.xml19
-rw-r--r--properties/application.properties23
-rw-r--r--properties/default_config.properties43
-rw-r--r--properties/service_flow_config.properties20
-rw-r--r--src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java74
-rw-r--r--src/main/java/com/zdjizhi/config/RadiusKnowledgeConfigs.java52
-rw-r--r--src/main/java/com/zdjizhi/config/RadiusKnowledgeConfiguration.java44
-rw-r--r--src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java50
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/ConversionProcess.java (renamed from src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java)42
-rw-r--r--src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java17
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java48
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java55
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java48
-rw-r--r--src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java75
-rw-r--r--src/test/java/com/zdjizhi/JsonTest.java31
-rw-r--r--src/test/java/com/zdjizhi/testData1
16 files changed, 258 insertions, 384 deletions
diff --git a/pom.xml b/pom.xml
index 0069c25..3360745 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>radius-account-knowledge</artifactId>
- <version>220816-VSYS</version>
+ <version>1.0.0</version>
<name>radius-account-knowledge</name>
<url>http://www.example.com</url>
@@ -35,12 +35,13 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.1</flink.version>
+ <galaxy.tools.version>1.2.1</galaxy.tools.version>
<hadoop.version>2.7.1</hadoop.version>
<kafka.version>1.0.0</kafka.version>
<hbase.version>2.2.3</hbase.version>
<hutool.version>5.7.17</hutool.version>
<jsonpath.version>2.4.0</jsonpath.version>
- <fastjson.version>2.0.32</fastjson.version>
+ <fastjson.version>2.0.40</fastjson.version>
<scope.type>provided</scope.type>
<!--<scope.type>compile</scope.type>-->
</properties>
@@ -117,19 +118,9 @@
<dependencies>
<dependency>
- <groupId>com.zdjizhi</groupId>
+ <groupId>com.geedgenetworks</groupId>
<artifactId>galaxy</artifactId>
- <version>1.1.3</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j-over-slf4j</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
+ <version>${galaxy.tools.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
diff --git a/properties/application.properties b/properties/application.properties
new file mode 100644
index 0000000..c2e98e0
--- /dev/null
+++ b/properties/application.properties
@@ -0,0 +1,23 @@
+#kafka 接收数据topic
+source.kafka.topic=RADIUS-RECORD
+
+source.kafka.props.bootstrap.servers=192.168.44.12:9094
+
+source.kafka.props.group.id=radius-onoff-231116-1
+
+source.kafka.props.security.protocol=SASL_PLAINTEXT
+
+source.kafka.props.sasl.mechanism=PLAIN
+
+source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+
+#补全数据 输出 topic
+sink.kafka.topic=RADIUS-ONFF
+
+sink.kafka.props.bootstrap.servers=192.168.44.12:9094
+
+sink.kafka.props.security.protocol=SASL_PLAINTEXT
+
+sink.kafka.props.sasl.mechanism=PLAIN
+
+sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; \ No newline at end of file
diff --git a/properties/default_config.properties b/properties/default_config.properties
deleted file mode 100644
index a683f33..0000000
--- a/properties/default_config.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#====================Kafka KafkaConsumer====================#
-#kafka source connection timeout
-session.timeout.ms=60000
-
-#kafka source poll
-max.poll.records=3000
-
-#kafka source poll bytes
-max.partition.fetch.bytes=31457280
-
-#====================Kafka KafkaProducer====================#
-#producer���ԵĴ�������
-retries=0
-
-#���ĺ������˵һ��Batch������֮��������ã��������Batch��û��д���������뷢�ͳ�ȥ��
-linger.ms=10
-
-#����ڳ�ʱ֮ǰδ�յ���Ӧ���ͻ��˽��ڱ�Ҫʱ���·�������
-request.timeout.ms=30000
-
-#producer���ǰ���batch���з��͵�,���δ�С��Ĭ��:16384
-batch.size=262144
-
-#Producer�����ڻ�����Ϣ�Ļ�������С
-#128M
-buffer.memory=134217728
-
-#�������������ÿ�η��͸�Kafka���������������С,Ĭ��1048576
-#10M
-max.request.size=10485760
-
-#������ѹ��ģʽ none or snappy
-producer.kafka.compression.type=none
-
-#������ack
-producer.ack=1
-
-#====================kafka default====================#
-#kafka SASL��֤�û���-����
-kafka.user=nsyGpHKGFA4KW0zro9MDdw==
-
-#kafka SASL��SSL��֤����-����
-kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ \ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
deleted file mode 100644
index 05f7466..0000000
--- a/properties/service_flow_config.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-#--------------------------------kafka消费组信息------------------------------#
-#管理kafka地址
-source.kafka.servers=192.168.44.12:9094
-
-#kafka 接收数据topic
-source.kafka.topic=test
-
-#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=radius-on-off-flink-20210615
-
-#--------------------------------Kafka生产者信息------------------------------#
-#管理输出kafka地址
-sink.kafka.servers=192.168.44.12:9094
-
-#补全数据 输出 topic
-sink.kafka.topic=RADIUS-ONFF
-
-#--------------------------------topology配置------------------------------#
-#定位库地址
-tools.library=D:\\workerspace\\dat\\ \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java
deleted file mode 100644
index 04ff500..0000000
--- a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package com.zdjizhi.common;
-
-
-import com.zdjizhi.utils.system.RadiusKnowledgeConfigurations;
-import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
-
-/**
- * @author Administrator
- */
-public class RadiusKnowledgeConfig {
-
- private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
-
- static {
- encryptor.setPassword("galaxy");
- }
-
- /**
- * 4- Accounting-Request(账户授权)
- */
- public static final int ACCOUNTING_REQUEST = 4;
- /**
- * 1、开始计费
- */
- public static final int START_BILLING = 1;
-
- /**
- * 3、过渡计费
- */
- public static final int UPDATE_BILLING = 3;
- /**
- * 2、停止计费
- */
- public static final int STOP_BILLING = 2;
-
- /**
- * 报文类型
- */
- public static final String RADIUS_PACKET_TYPE = "radius_packet_type";
-
-
-
- /**
- * kafka
- */
- public static final String SOURCE_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "source.kafka.servers");
- public static final String SOURCE_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "source.kafka.topic");
- public static final String SINK_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.servers");
- public static final String SINK_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.topic");
- public static final String GROUP_ID = RadiusKnowledgeConfigurations.getStringProperty(0, "group.id");
- public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(1, "producer.ack");
- public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(1, "producer.kafka.compression.type");
-
- /**
- * default config
- */
- public static final String RETRIES = RadiusKnowledgeConfigurations.getStringProperty(1, "retries");
- public static final String LINGER_MS = RadiusKnowledgeConfigurations.getStringProperty(1, "linger.ms");
- public static final Integer REQUEST_TIMEOUT_MS = RadiusKnowledgeConfigurations.getIntProperty(1, "request.timeout.ms");
- public static final Integer BATCH_SIZE = RadiusKnowledgeConfigurations.getIntProperty(1, "batch.size");
- public static final Integer BUFFER_MEMORY = RadiusKnowledgeConfigurations.getIntProperty(1, "buffer.memory");
- public static final Integer MAX_REQUEST_SIZE = RadiusKnowledgeConfigurations.getIntProperty(1, "max.request.size");
- public static final String TOOLS_LIBRARY = RadiusKnowledgeConfigurations.getStringProperty(0, "tools.library");
- public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.user"));
- public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.pin"));
-
- /**
- * kafka source config
- */
- public static final String SESSION_TIMEOUT_MS = RadiusKnowledgeConfigurations.getStringProperty(1, "session.timeout.ms");
- public static final String MAX_POLL_RECORDS = RadiusKnowledgeConfigurations.getStringProperty(1, "max.poll.records");
- public static final String MAX_PARTITION_FETCH_BYTES = RadiusKnowledgeConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
-
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfigs.java b/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfigs.java
new file mode 100644
index 0000000..0221451
--- /dev/null
+++ b/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfigs.java
@@ -0,0 +1,52 @@
+package com.zdjizhi.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Containing configuration options for the Fusion application.
+ *
+ * @author chaoc
+ * @since 1.0
+ */
+public class RadiusKnowledgeConfigs {
+
+ /**
+ * The prefix for Kafka properties used in the source.
+ */
+ public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
+
+ /**
+ * The prefix for Kafka properties used in the sink.
+ */
+ public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
+
+
+ public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
+ ConfigOptions.key("source.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the source.");
+
+
+ public static final ConfigOption<String> SINK_KAFKA_TOPIC =
+ ConfigOptions.key("sink.kafka.topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Kafka topic used in the sink.");
+
+
+ public static final ConfigOption<String> STARTUP_MODE =
+ ConfigOptions.key("startup.mode")
+ .stringType()
+ .defaultValue("group")
+ .withDescription("The offset commit mode for the consumer.");
+
+
+ public static final ConfigOption<Boolean> LOG_FAILURES_ONLY =
+ ConfigOptions.key("log.failures.only")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Defines whether the producer should fail on errors, or only log them.");
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfiguration.java b/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfiguration.java
new file mode 100644
index 0000000..06359e4
--- /dev/null
+++ b/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfiguration.java
@@ -0,0 +1,44 @@
+package com.zdjizhi.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Properties;
+
+/**
+ * A wrapper class that extends the Flink `Configuration` to provide utility methods for handling
+ * properties with a specific prefix. This class allows retrieving properties that start with the
+ * given `prefix` and converts them into a `java.util.Properties` object.
+ *
+ * @author chaoc
+ * @since 1.0
+ */
+
+public class RadiusKnowledgeConfiguration {
+ private final Configuration config;
+
+ public RadiusKnowledgeConfiguration(final Configuration config) {
+ this.config = config;
+ }
+
+ /**
+ * Retrieves properties from the underlying `Configuration` instance that start with the specified
+ * `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
+ *
+ * @param prefix The prefix to filter properties.
+ * @return A `java.util.Properties` object containing the properties with the specified prefix.
+ */
+ public Properties getProperties(final String prefix) {
+ if (prefix == null) {
+ final Properties props = new Properties();
+ props.putAll(config.toMap());
+ return props;
+ }
+ return config.toMap()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getKey().startsWith(prefix))
+ .collect(Properties::new, (props, e) ->
+ props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
+ Properties::putAll);
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java
index afd714a..6edb0ee 100644
--- a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java
+++ b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java
@@ -1,16 +1,18 @@
package com.zdjizhi.topology;
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.RadiusKnowledgeConfig;
-import com.zdjizhi.utils.functions.FilterNullFunction;
-import com.zdjizhi.utils.functions.MapCompletedFunction;
+
+import com.zdjizhi.config.RadiusKnowledgeConfiguration;
+import com.zdjizhi.utils.functions.ConversionProcess;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import static com.zdjizhi.config.RadiusKnowledgeConfigs.*;
+
/**
* @author qidaijie
* @Package com.zdjizhi.topology
@@ -18,23 +20,37 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @date 2021/5/2016:42
*/
public class RadiusKnowledgeTopology {
- private static final Log logger = LogFactory.get();
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
+
+ // param check
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Error: Not found properties path. " +
+ "\nUsage: flink -c xxx xxx.jar app.properties.");
+ }
+
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer());
+ ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
+ final Configuration config = tool.getConfiguration();
+ environment.getConfig().setGlobalJobParameters(config);
+ final RadiusKnowledgeConfiguration fusionConfiguration = new RadiusKnowledgeConfiguration(config);
- DataStream<String> result = streamSource.map(new MapCompletedFunction()).name(RadiusKnowledgeConfig.SOURCE_KAFKA_TOPIC)
- .filter(new FilterNullFunction()).name("FilterAbnormalData");
+ DataStreamSource<String> streamSource = environment.addSource(
+ KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX),
+ config.get(SOURCE_KAFKA_TOPIC),
+ config.get(STARTUP_MODE)));
- result.addSink(KafkaProducer.getKafkaProducer()).name(RadiusKnowledgeConfig.SINK_KAFKA_TOPIC);
- try {
- environment.execute("RADIUS-ON-OFF");
- } catch (Exception e) {
- logger.error("This Flink task start ERROR! Exception information is :" + e);
- }
+ SingleOutputStreamOperator<String> process = streamSource.process(new ConversionProcess());
+
+
+ process.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX),
+ config.get(SINK_KAFKA_TOPIC),
+ config.get(LOG_FAILURES_ONLY)));
+
+
+ environment.execute("RADIUS-ON-OFF");
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/ConversionProcess.java
index 40eef71..5581b19 100644
--- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java
+++ b/src/main/java/com/zdjizhi/utils/functions/ConversionProcess.java
@@ -3,46 +3,39 @@ package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject;
-import com.zdjizhi.common.RadiusKnowledgeConfig;
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.MapFunction;
+import com.alibaba.fastjson2.JSONPath;
+import com.geedgenetworks.utils.StringUtil;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
/**
* @author qidaijie
* @Package com.zdjizhi.utils.functions
* @Description:
- * @date 2021/5/2715:01
+ * @date 2023/11/1610:30
*/
-public class MapCompletedFunction implements MapFunction<String, String> {
+public class ConversionProcess extends ProcessFunction<String, String> {
private static final Log logger = LogFactory.get();
+ private static final String dataTypeExpr = "[?(@.radius_packet_type = 4)]";
+
@Override
- public String map(String logs) {
- String result = null;
+ public void processElement(String message, Context ctx, Collector<String> out) throws Exception {
try {
- if (StringUtil.isNotBlank(logs)) {
- JSONObject radiusLog = JSONObject.parse(logs);
- if (radiusLog.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) {
- int packetType = radiusLog.getIntValue(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE);
- if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) {
- result = GetKnowledgeLogs(radiusLog);
- }
-
+ if (StringUtil.isNotBlank(message)) {
+ Object isAccountingData = JSONPath.eval(message, dataTypeExpr);
+ if (isAccountingData != null) {
+ JSONObject radiusLog = JSONObject.parse(message);
+ GetKnowledgeLogs(radiusLog, out);
}
}
} catch (RuntimeException e) {
logger.error("Radius log parsing exception,Detailed info:" + e);
}
- return result;
}
- /**
- * 获取RadiusOnOff日志
- *
- * @param radiusLog 原始日志
- * @return OnOff日志
- */
- private static String GetKnowledgeLogs(JSONObject radiusLog) {
+
+ private static void GetKnowledgeLogs(JSONObject radiusLog, Collector<String> out) {
JSONObject knowledge = new JSONObject();
String framedIp = radiusLog.getString("radius_framed_ip");
@@ -76,8 +69,7 @@ public class MapCompletedFunction implements MapFunction<String, String> {
//用户的在线时长,以秒为单位,下线用户无此属性,默认为0
knowledge.put("acct_session_time", radiusLog.getLongValue("radius_acct_session_time", 0L));
- return knowledge.toString();
+ out.collect(knowledge.toString());
}
- return "";
}
}
diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
deleted file mode 100644
index de507ad..0000000
--- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.zdjizhi.utils.functions;
-
-import com.zdjizhi.utils.StringUtil;
-import org.apache.flink.api.common.functions.FilterFunction;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.functions
- * @Description:
- * @date 2021/5/2715:01
- */
-public class FilterNullFunction implements FilterFunction<String> {
- @Override
- public boolean filter(String message) {
- return StringUtil.isNotBlank(message);
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
deleted file mode 100644
index 88bc377..0000000
--- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.zdjizhi.utils.kafka;
-
-import com.zdjizhi.common.RadiusKnowledgeConfig;
-import org.apache.kafka.common.config.SslConfigs;
-
-import java.util.Properties;
-
-/**
- * @author qidaijie
- * @Package com.zdjizhi.utils.kafka
- * @Description:
- * @date 2021/9/610:37
- */
-class CertUtils {
- /**
- * Kafka SASL认证端口
- */
- private static final String SASL_PORT = "9094";
-
- /**
- * Kafka SSL认证端口
- */
- private static final String SSL_PORT = "9095";
-
- /**
- * 根据连接信息端口判断认证方式。
- *
- * @param servers kafka 连接信息
- * @param properties kafka 连接配置信息
- */
- static void chooseCert(String servers, Properties properties) {
- if (servers.contains(SASL_PORT)) {
- properties.put("security.protocol", "SASL_PLAINTEXT");
- properties.put("sasl.mechanism", "PLAIN");
- properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
- + RadiusKnowledgeConfig.KAFKA_SASL_JAAS_USER + " password=" + RadiusKnowledgeConfig.KAFKA_SASL_JAAS_PIN + ";");
- } else if (servers.contains(SSL_PORT)) {
- properties.put("security.protocol", "SSL");
- properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- properties.put("ssl.keystore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "keystore.jks");
- properties.put("ssl.keystore.password", RadiusKnowledgeConfig.KAFKA_SASL_JAAS_PIN);
- properties.put("ssl.truststore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "truststore.jks");
- properties.put("ssl.truststore.password", RadiusKnowledgeConfig.KAFKA_SASL_JAAS_PIN);
- properties.put("ssl.key.password", RadiusKnowledgeConfig.KAFKA_SASL_JAAS_PIN);
- }
-
- }
-}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index bb47bf9..397814f 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -1,6 +1,5 @@
package com.zdjizhi.utils.kafka;
-import com.zdjizhi.common.RadiusKnowledgeConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -14,30 +13,40 @@ import java.util.Properties;
* @date 2021/6/813:54
*/
public class KafkaConsumer {
- private static Properties createConsumerConfig() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", RadiusKnowledgeConfig.SOURCE_KAFKA_SERVERS);
- properties.put("group.id", RadiusKnowledgeConfig.GROUP_ID);
- properties.put("session.timeout.ms", RadiusKnowledgeConfig.SESSION_TIMEOUT_MS);
- properties.put("max.poll.records", RadiusKnowledgeConfig.MAX_POLL_RECORDS);
- properties.put("max.partition.fetch.bytes", RadiusKnowledgeConfig.MAX_PARTITION_FETCH_BYTES);
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- CertUtils.chooseCert(RadiusKnowledgeConfig.SOURCE_KAFKA_SERVERS, properties);
-
- return properties;
- }
-
- public static FlinkKafkaConsumer<String> getKafkaConsumer() {
- FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
- RadiusKnowledgeConfig.SOURCE_KAFKA_TOPIC,
- new SimpleStringSchema(),
- createConsumerConfig());
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
- kafkaConsumer.setStartFromGroupOffsets();
+ /**
+ * 官方序列化kafka数据
+ *
+ * @return kafka logs
+ */
+ public static FlinkKafkaConsumer<String> getKafkaConsumer(Properties properties, String topic, String startupMode) {
+
+ setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
+ setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
+ setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280);
+
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
+
+ switch (startupMode) {
+ case "group":
+ kafkaConsumer.setStartFromGroupOffsets();
+ break;
+ case "latest":
+ kafkaConsumer.setStartFromLatest();
+ break;
+ case "earliest":
+ kafkaConsumer.setStartFromEarliest();
+ break;
+ default:
+ kafkaConsumer.setStartFromGroupOffsets();
+ }
return kafkaConsumer;
}
+
+ private static void setDefaultConfig(Properties properties, String key, Object value) {
+ if (!properties.contains(key)) {
+ properties.put(key, value);
+ }
+ }
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index 3b88909..c7cd3f2 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -1,9 +1,7 @@
package com.zdjizhi.utils.kafka;
-import com.zdjizhi.common.RadiusKnowledgeConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Optional;
import java.util.Properties;
@@ -16,35 +14,29 @@ import java.util.Properties;
*/
public class KafkaProducer {
- private static Properties createProducerConfig() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", RadiusKnowledgeConfig.SINK_KAFKA_SERVERS);
- properties.put("acks", RadiusKnowledgeConfig.PRODUCER_ACK);
- properties.put("retries", RadiusKnowledgeConfig.RETRIES);
- properties.put("linger.ms", RadiusKnowledgeConfig.LINGER_MS);
- properties.put("request.timeout.ms", RadiusKnowledgeConfig.REQUEST_TIMEOUT_MS);
- properties.put("batch.size", RadiusKnowledgeConfig.BATCH_SIZE);
- properties.put("buffer.memory", RadiusKnowledgeConfig.BUFFER_MEMORY);
- properties.put("max.request.size", RadiusKnowledgeConfig.MAX_REQUEST_SIZE);
- properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, RadiusKnowledgeConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
-
- CertUtils.chooseCert(RadiusKnowledgeConfig.SINK_KAFKA_SERVERS, properties);
-
- return properties;
- }
-
-
- public static FlinkKafkaProducer<String> getKafkaProducer() {
- FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
- RadiusKnowledgeConfig.SINK_KAFKA_TOPIC,
+ public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) {
+ setDefaultConfig(properties, "ack", 1);
+ setDefaultConfig(properties, "retries", 0);
+ setDefaultConfig(properties, "linger.ms", 10);
+ setDefaultConfig(properties, "request.timeout.ms", 30000);
+ setDefaultConfig(properties, "batch.size", 262144);
+ setDefaultConfig(properties, "buffer.memory", 134217728);
+ setDefaultConfig(properties, "max.request.size", 10485760);
+ setDefaultConfig(properties, "compression.type", "snappy");
+
+ FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
+ topic,
new SimpleStringSchema(),
- createProducerConfig(),
- //sink与所有分区建立连接,轮询写入;
- Optional.empty());
+ properties, Optional.empty());
- //允许producer记录失败日志而不是捕获和抛出它们
- kafkaProducer.setLogFailuresOnly(true);
+ kafkaProducer.setLogFailuresOnly(logFailuresOnly);
return kafkaProducer;
}
+
+ private static void setDefaultConfig(Properties properties, String key, Object value) {
+ if (!properties.contains(key)) {
+ properties.put(key, value);
+ }
+ }
}
diff --git a/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java
deleted file mode 100644
index 58923fe..0000000
--- a/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package com.zdjizhi.utils.system;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.utils.StringUtil;
-
-import java.io.IOException;
-import java.util.Locale;
-import java.util.Properties;
-
-
-/**
- * @author Administrator
- */
-
-public final class RadiusKnowledgeConfigurations {
- private static final Log logger = LogFactory.get();
-
-
- private static Properties propDefault = new Properties();
- private static Properties propService = new Properties();
-
-
- public static String getStringProperty(Integer type, String key) {
- if (type == 0) {
- return propService.getProperty(key).trim();
- } else if (type == 1) {
- return propDefault.getProperty(key).trim();
- } else {
- return null;
- }
-
- }
-
- public static Integer getIntProperty(Integer type, String key) {
- if (type == 0) {
- return Integer.parseInt(propService.getProperty(key).trim());
- } else if (type == 1) {
- return Integer.parseInt(propDefault.getProperty(key).trim());
- } else {
- return null;
- }
- }
-
- public static Long getLongProperty(Integer type, String key) {
- if (type == 0) {
- return Long.parseLong(propService.getProperty(key).trim());
- } else if (type == 1) {
- return Long.parseLong(propDefault.getProperty(key).trim());
- } else {
- return null;
- }
- }
-
- public static Boolean getBooleanProperty(Integer type, String key) {
- if (type == 0) {
- return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
- } else if (type == 1) {
- return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
- } else {
- return null;
- }
- }
-
- static {
- try {
- propService.load(RadiusKnowledgeConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
- propDefault.load(RadiusKnowledgeConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
- } catch (IOException | RuntimeException e) {
- propDefault = null;
- propService = null;
- logger.error("Failed to obtain configuration file information,Detailed info:" + e);
- }
- }
-}
diff --git a/src/test/java/com/zdjizhi/JsonTest.java b/src/test/java/com/zdjizhi/JsonTest.java
new file mode 100644
index 0000000..94a8819
--- /dev/null
+++ b/src/test/java/com/zdjizhi/JsonTest.java
@@ -0,0 +1,31 @@
+package com.zdjizhi;
+
+import com.alibaba.fastjson2.JSONPath;
+import com.alibaba.fastjson2.JSONReader;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2023/11/1611:00
+ */
+public class JsonTest {
+ private static Map<String, JSONPath> jsonPathMap = new ConcurrentHashMap<>(16);
+
+
+ @Test
+ public void jsonPathTest(){
+ String message = "{\"common_stream_dir\":1,\"common_address_type\":4,\"common_client_ip\":\"192.50.50.146\",\"common_server_ip\":\"192.168.40.190\",\"common_client_port\":43063,\"common_server_port\":1813,\"common_c2s_pkt_num\":1,\"common_s2c_pkt_num\":0,\"common_c2s_byte_num\":54,\"common_s2c_byte_num\":0,\"common_start_time\":1700102711,\"common_end_time\":1700102711,\"common_con_duration_ms\":0,\"common_stream_trace_id\":249261754914099,\"common_l4_protocol\":\"IPv4_UDP\",\"common_address_list\":\"43063-1813-192.50.50.146-192.168.40.190\",\"common_in_src_mac\":\"c6:0a:13:99:60:8d\",\"common_in_dest_mac\":\"48:73:97:96:38:1a\",\"common_policy_id\":0,\"common_service\":162,\"common_sled_ip\":\"192.168.40.161\",\"common_schema_type\":\"RADIUS\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-7400\\\"},{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-7400\\\"}]}\",\"common_vsys_id\":1,\"radius_packet_type\":4,\"radius_account\":\"test0157\",\"radius_acct_status_type\":2,\"radius_acct_session_id\":\"9264\",\"radius_framed_ip\":\"192.168.56.157\",\"radius_event_timestamp\":1665480477}";
+
+ JSONReader parser = JSONReader.of(message);
+
+ String dataTypeExpr = "[?(@.radius_packet_type = 1)]";
+
+ JSONPath jsonPath = JSONPath.of(dataTypeExpr);
+ System.out.println(jsonPath.extract(parser));
+ }
+}
diff --git a/src/test/java/com/zdjizhi/testData b/src/test/java/com/zdjizhi/testData
new file mode 100644
index 0000000..904ecd6
--- /dev/null
+++ b/src/test/java/com/zdjizhi/testData
@@ -0,0 +1 @@
+{"common_stream_dir":1,"common_address_type":4,"common_client_ip":"192.50.50.146","common_server_ip":"192.168.40.190","common_client_port":43063,"common_server_port":1813,"common_c2s_pkt_num":1,"common_s2c_pkt_num":0,"common_c2s_byte_num":54,"common_s2c_byte_num":0,"common_start_time":1700102711,"common_end_time":1700102711,"common_con_duration_ms":0,"common_stream_trace_id":249261754914099,"common_l4_protocol":"IPv4_UDP","common_address_list":"43063-1813-192.50.50.146-192.168.40.190","common_in_src_mac":"c6:0a:13:99:60:8d","common_in_dest_mac":"48:73:97:96:38:1a","common_policy_id":0,"common_service":162,"common_sled_ip":"192.168.40.161","common_schema_type":"RADIUS","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-7400\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-7400\"}]}","common_vsys_id":1,"radius_packet_type":4,"radius_account":"test0157","radius_acct_status_type":2,"radius_acct_session_id":"9264","radius_framed_ip":"192.168.56.157","radius_event_timestamp":1665480477}