diff options
| author | qidaijie <[email protected]> | 2023-11-16 11:14:19 +0800 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2023-11-16 11:14:19 +0800 |
| commit | 8214c9255a85d276731c0ccdb7018467af335292 (patch) | |
| tree | 96579242ef006a40d8d01a2a571014d9b31cfff6 | |
| parent | 686a07bbb107b53cb5682aae7423e9d019e59830 (diff) | |
优化配置文件加载方式feature/23.11
16 files changed, 258 insertions, 384 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>radius-account-knowledge</artifactId> - <version>220816-VSYS</version> + <version>1.0.0</version> <name>radius-account-knowledge</name> <url>http://www.example.com</url> @@ -35,12 +35,13 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.13.1</flink.version> + <galaxy.tools.version>1.2.1</galaxy.tools.version> <hadoop.version>2.7.1</hadoop.version> <kafka.version>1.0.0</kafka.version> <hbase.version>2.2.3</hbase.version> <hutool.version>5.7.17</hutool.version> <jsonpath.version>2.4.0</jsonpath.version> - <fastjson.version>2.0.32</fastjson.version> + <fastjson.version>2.0.40</fastjson.version> <scope.type>provided</scope.type> <!--<scope.type>compile</scope.type>--> </properties> @@ -117,19 +118,9 @@ <dependencies> <dependency> - <groupId>com.zdjizhi</groupId> + <groupId>com.geedgenetworks</groupId> <artifactId>galaxy</artifactId> - <version>1.1.3</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>log4j-over-slf4j</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> + <version>${galaxy.tools.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> diff --git a/properties/application.properties b/properties/application.properties new file mode 100644 index 0000000..c2e98e0 --- /dev/null +++ b/properties/application.properties @@ -0,0 +1,23 @@ +#kafka 接收数据topic +source.kafka.topic=RADIUS-RECORD + +source.kafka.props.bootstrap.servers=192.168.44.12:9094 + +source.kafka.props.group.id=radius-onoff-231116-1 + +source.kafka.props.security.protocol=SASL_PLAINTEXT + +source.kafka.props.sasl.mechanism=PLAIN + +source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; + +#补全数据 输出 topic +sink.kafka.topic=RADIUS-ONFF + +sink.kafka.props.bootstrap.servers=192.168.44.12:9094 + +sink.kafka.props.security.protocol=SASL_PLAINTEXT + +sink.kafka.props.sasl.mechanism=PLAIN + +sink.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
\ No newline at end of file diff --git a/properties/default_config.properties b/properties/default_config.properties deleted file mode 100644 index a683f33..0000000 --- a/properties/default_config.properties +++ /dev/null @@ -1,43 +0,0 @@ -#====================Kafka KafkaConsumer====================# -#kafka source connection timeout -session.timeout.ms=60000 - -#kafka source poll -max.poll.records=3000 - -#kafka source poll bytes -max.partition.fetch.bytes=31457280 - -#====================Kafka KafkaProducer====================# -#producer���ԵĴ������� -retries=0 - -#���ĺ������˵һ��Batch������֮��������ã��������Batch��û��д���������뷢�ͳ�ȥ�� -linger.ms=10 - -#����ڳ�ʱ֮ǰδ�յ���Ӧ���ͻ��˽��ڱ�Ҫʱ���·������� -request.timeout.ms=30000 - -#producer���ǰ���batch���з��͵�,���δ�С��Ĭ��:16384 -batch.size=262144 - -#Producer�����ڻ�����Ϣ�Ļ�������С -#128M -buffer.memory=134217728 - -#�������������ÿ�η���Kafka���������������С,Ĭ��1048576 -#10M -max.request.size=10485760 - -#������ѹ��ģʽ none or snappy -producer.kafka.compression.type=none - -#������ack -producer.ack=1 - -#====================kafka default====================# -#kafka SASL��֤�û���-���� -kafka.user=nsyGpHKGFA4KW0zro9MDdw== - -#kafka SASL��SSL��֤����-���� -kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
\ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties deleted file mode 100644 index 05f7466..0000000 --- a/properties/service_flow_config.properties +++ /dev/null @@ -1,20 +0,0 @@ -#--------------------------------kafka消费组信息------------------------------# -#管理kafka地址 -source.kafka.servers=192.168.44.12:9094 - -#kafka 接收数据topic -source.kafka.topic=test - -#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=radius-on-off-flink-20210615 - -#--------------------------------Kafka生产者信息------------------------------# -#管理输出kafka地址 -sink.kafka.servers=192.168.44.12:9094 - -#补全数据 输出 topic -sink.kafka.topic=RADIUS-ONFF - -#--------------------------------topology配置------------------------------# -#定位库地址 -tools.library=D:\\workerspace\\dat\\
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java b/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java deleted file mode 100644 index 04ff500..0000000 --- a/src/main/java/com/zdjizhi/common/RadiusKnowledgeConfig.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.zdjizhi.common; - - -import com.zdjizhi.utils.system.RadiusKnowledgeConfigurations; -import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; - -/** - * @author Administrator - */ -public class RadiusKnowledgeConfig { - - private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); - - static { - encryptor.setPassword("galaxy"); - } - - /** - * 4- Accounting-Request(账户授权) - */ - public static final int ACCOUNTING_REQUEST = 4; - /** - * 1、开始计费 - */ - public static final int START_BILLING = 1; - - /** - * 3、过渡计费 - */ - public static final int UPDATE_BILLING = 3; - /** - * 2、停止计费 - */ - public static final int STOP_BILLING = 2; - - /** - * 报文类型 - */ - public static final String RADIUS_PACKET_TYPE = "radius_packet_type"; - - - - /** - * kafka - */ - public static final String SOURCE_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "source.kafka.servers"); - public static final String SOURCE_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "source.kafka.topic"); - public static final String SINK_KAFKA_SERVERS = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.servers"); - public static final String SINK_KAFKA_TOPIC = RadiusKnowledgeConfigurations.getStringProperty(0, "sink.kafka.topic"); - public static final String GROUP_ID = RadiusKnowledgeConfigurations.getStringProperty(0, "group.id"); - public static final String PRODUCER_ACK = RadiusKnowledgeConfigurations.getStringProperty(1, "producer.ack"); - public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = RadiusKnowledgeConfigurations.getStringProperty(1, "producer.kafka.compression.type"); - - /** - * default config - */ - public static final String RETRIES = RadiusKnowledgeConfigurations.getStringProperty(1, "retries"); - public static final String LINGER_MS = RadiusKnowledgeConfigurations.getStringProperty(1, "linger.ms"); - public static final Integer REQUEST_TIMEOUT_MS = RadiusKnowledgeConfigurations.getIntProperty(1, "request.timeout.ms"); - public static final Integer BATCH_SIZE = RadiusKnowledgeConfigurations.getIntProperty(1, "batch.size"); - public static final Integer BUFFER_MEMORY = RadiusKnowledgeConfigurations.getIntProperty(1, "buffer.memory"); - public static final Integer MAX_REQUEST_SIZE = RadiusKnowledgeConfigurations.getIntProperty(1, "max.request.size"); - public static final String TOOLS_LIBRARY = RadiusKnowledgeConfigurations.getStringProperty(0, "tools.library"); - public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.user")); - public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(RadiusKnowledgeConfigurations.getStringProperty(1, "kafka.pin")); - - /** - * kafka source config - */ - public static final String SESSION_TIMEOUT_MS = RadiusKnowledgeConfigurations.getStringProperty(1, "session.timeout.ms"); - public static final String MAX_POLL_RECORDS = RadiusKnowledgeConfigurations.getStringProperty(1, "max.poll.records"); - public static final String MAX_PARTITION_FETCH_BYTES = RadiusKnowledgeConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); - -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfigs.java b/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfigs.java new file mode 100644 index 0000000..0221451 --- /dev/null +++ b/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfigs.java @@ -0,0 +1,52 @@ +package com.zdjizhi.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * Containing configuration options for the Fusion application. + * + * @author chaoc + * @since 1.0 + */ +public class RadiusKnowledgeConfigs { + + /** + * The prefix for Kafka properties used in the source. + */ + public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props."; + + /** + * The prefix for Kafka properties used in the sink. + */ + public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props."; + + + public static final ConfigOption<String> SOURCE_KAFKA_TOPIC = + ConfigOptions.key("source.kafka.topic") + .stringType() + .noDefaultValue() + .withDescription("The Kafka topic used in the source."); + + + public static final ConfigOption<String> SINK_KAFKA_TOPIC = + ConfigOptions.key("sink.kafka.topic") + .stringType() + .noDefaultValue() + .withDescription("The Kafka topic used in the sink."); + + + public static final ConfigOption<String> STARTUP_MODE = + ConfigOptions.key("startup.mode") + .stringType() + .defaultValue("group") + .withDescription("The offset commit mode for the consumer."); + + + public static final ConfigOption<Boolean> LOG_FAILURES_ONLY = + ConfigOptions.key("log.failures.only") + .booleanType() + .defaultValue(false) + .withDescription("Defines whether the producer should fail on errors, or only log them."); + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfiguration.java b/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfiguration.java new file mode 100644 index 0000000..06359e4 --- /dev/null +++ b/src/main/java/com/zdjizhi/config/RadiusKnowledgeConfiguration.java @@ -0,0 +1,44 @@ +package com.zdjizhi.config; + +import org.apache.flink.configuration.Configuration; + +import java.util.Properties; + +/** + * A wrapper class that extends the Flink `Configuration` to provide utility methods for handling + * properties with a specific prefix. This class allows retrieving properties that start with the + * given `prefix` and converts them into a `java.util.Properties` object. + * + * @author chaoc + * @since 1.0 + */ + +public class RadiusKnowledgeConfiguration { + private final Configuration config; + + public RadiusKnowledgeConfiguration(final Configuration config) { + this.config = config; + } + + /** + * Retrieves properties from the underlying `Configuration` instance that start with the specified + * `prefix`. The properties are then converted into a `java.util.Properties` object and returned. + * + * @param prefix The prefix to filter properties. + * @return A `java.util.Properties` object containing the properties with the specified prefix. + */ + public Properties getProperties(final String prefix) { + if (prefix == null) { + final Properties props = new Properties(); + props.putAll(config.toMap()); + return props; + } + return config.toMap() + .entrySet() + .stream() + .filter(entry -> entry.getKey().startsWith(prefix)) + .collect(Properties::new, (props, e) -> + props.setProperty(e.getKey().substring(prefix.length()), e.getValue()), + Properties::putAll); + } +} diff --git a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java index afd714a..6edb0ee 100644 --- a/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java +++ b/src/main/java/com/zdjizhi/topology/RadiusKnowledgeTopology.java @@ -1,16 +1,18 @@ package com.zdjizhi.topology; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.RadiusKnowledgeConfig; -import com.zdjizhi.utils.functions.FilterNullFunction; -import com.zdjizhi.utils.functions.MapCompletedFunction; + +import com.zdjizhi.config.RadiusKnowledgeConfiguration; +import com.zdjizhi.utils.functions.ConversionProcess; import com.zdjizhi.utils.kafka.KafkaConsumer; import com.zdjizhi.utils.kafka.KafkaProducer; -import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import static com.zdjizhi.config.RadiusKnowledgeConfigs.*; + /** * @author qidaijie * @Package com.zdjizhi.topology @@ -18,23 +20,37 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; * @date 2021/5/2016:42 */ public class RadiusKnowledgeTopology { - private static final Log logger = LogFactory.get(); - public static void main(String[] args) { + public static void main(String[] args) throws Exception { + + // param check + if (args.length < 1) { + throw new IllegalArgumentException("Error: Not found properties path. " + + "\nUsage: flink -c xxx xxx.jar app.properties."); + } + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStreamSource<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer()); + ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]); + final Configuration config = tool.getConfiguration(); + environment.getConfig().setGlobalJobParameters(config); + final RadiusKnowledgeConfiguration fusionConfiguration = new RadiusKnowledgeConfiguration(config); - DataStream<String> result = streamSource.map(new MapCompletedFunction()).name(RadiusKnowledgeConfig.SOURCE_KAFKA_TOPIC) - .filter(new FilterNullFunction()).name("FilterAbnormalData"); + DataStreamSource<String> streamSource = environment.addSource( + KafkaConsumer.getKafkaConsumer(fusionConfiguration.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX), + config.get(SOURCE_KAFKA_TOPIC), + config.get(STARTUP_MODE))); - result.addSink(KafkaProducer.getKafkaProducer()).name(RadiusKnowledgeConfig.SINK_KAFKA_TOPIC); - try { - environment.execute("RADIUS-ON-OFF"); - } catch (Exception e) { - logger.error("This Flink task start ERROR! Exception information is :" + e); - } + SingleOutputStreamOperator<String> process = streamSource.process(new ConversionProcess()); + + + process.addSink(KafkaProducer.getKafkaProducer(fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX), + config.get(SINK_KAFKA_TOPIC), + config.get(LOG_FAILURES_ONLY))); + + + environment.execute("RADIUS-ON-OFF"); } diff --git a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java b/src/main/java/com/zdjizhi/utils/functions/ConversionProcess.java index 40eef71..5581b19 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapCompletedFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ConversionProcess.java @@ -3,46 +3,39 @@ package com.zdjizhi.utils.functions; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSONObject; -import com.zdjizhi.common.RadiusKnowledgeConfig; -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.common.functions.MapFunction; +import com.alibaba.fastjson2.JSONPath; +import com.geedgenetworks.utils.StringUtil; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; /** * @author qidaijie * @Package com.zdjizhi.utils.functions * @Description: - * @date 2021/5/2715:01 + * @date 2023/11/1610:30 */ -public class MapCompletedFunction implements MapFunction<String, String> { +public class ConversionProcess extends ProcessFunction<String, String> { private static final Log logger = LogFactory.get(); + private static final String dataTypeExpr = "[?(@.radius_packet_type = 4)]"; + @Override - public String map(String logs) { - String result = null; + public void processElement(String message, Context ctx, Collector<String> out) throws Exception { try { - if (StringUtil.isNotBlank(logs)) { - JSONObject radiusLog = JSONObject.parse(logs); - if (radiusLog.containsKey(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE)) { - int packetType = radiusLog.getIntValue(RadiusKnowledgeConfig.RADIUS_PACKET_TYPE); - if (RadiusKnowledgeConfig.ACCOUNTING_REQUEST == packetType) { - result = GetKnowledgeLogs(radiusLog); - } - + if (StringUtil.isNotBlank(message)) { + Object isAccountingData = JSONPath.eval(message, dataTypeExpr); + if (isAccountingData != null) { + JSONObject radiusLog = JSONObject.parse(message); + GetKnowledgeLogs(radiusLog, out); } } } catch (RuntimeException e) { logger.error("Radius log parsing exception,Detailed info:" + e); } - return result; } - /** - * 获取RadiusOnOff日志 - * - * @param radiusLog 原始日志 - * @return OnOff日志 - */ - private static String GetKnowledgeLogs(JSONObject radiusLog) { + + private static void GetKnowledgeLogs(JSONObject radiusLog, Collector<String> out) { JSONObject knowledge = new JSONObject(); String framedIp = radiusLog.getString("radius_framed_ip"); @@ -76,8 +69,7 @@ public class MapCompletedFunction implements MapFunction<String, String> { //用户的在线时长,以秒为单位,下线用户无此属性,默认为0 knowledge.put("acct_session_time", radiusLog.getLongValue("radius_acct_session_time", 0L)); - return knowledge.toString(); + out.collect(knowledge.toString()); } - return ""; } } diff --git a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java b/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java deleted file mode 100644 index de507ad..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/FilterNullFunction.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.zdjizhi.utils.functions; - -import com.zdjizhi.utils.StringUtil; -import org.apache.flink.api.common.functions.FilterFunction; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/5/2715:01 - */ -public class FilterNullFunction implements FilterFunction<String> { - @Override - public boolean filter(String message) { - return StringUtil.isNotBlank(message); - } -} diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java deleted file mode 100644 index 88bc377..0000000 --- a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.zdjizhi.utils.kafka; - -import com.zdjizhi.common.RadiusKnowledgeConfig; -import org.apache.kafka.common.config.SslConfigs; - -import java.util.Properties; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.kafka - * @Description: - * @date 2021/9/610:37 - */ -class CertUtils { - /** - * Kafka SASL认证端口 - */ - private static final String SASL_PORT = "9094"; - - /** - * Kafka SSL认证端口 - */ - private static final String SSL_PORT = "9095"; - - /** - * 根据连接信息端口判断认证方式。 - * - * @param servers kafka 连接信息 - * @param properties kafka 连接配置信息 - */ - static void chooseCert(String servers, Properties properties) { - if (servers.contains(SASL_PORT)) { - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + RadiusKnowledgeConfig.KAFKA_SASL_JAAS_USER + " password=" + RadiusKnowledgeConfig.KAFKA_SASL_JAAS_PIN + ";"); - } else if (servers.contains(SSL_PORT)) { - properties.put("security.protocol", "SSL"); - properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - properties.put("ssl.keystore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", RadiusKnowledgeConfig.KAFKA_SASL_JAAS_PIN); - properties.put("ssl.truststore.location", RadiusKnowledgeConfig.TOOLS_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", RadiusKnowledgeConfig.KAFKA_SASL_JAAS_PIN); - properties.put("ssl.key.password", RadiusKnowledgeConfig.KAFKA_SASL_JAAS_PIN); - } - - } -} diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java index bb47bf9..397814f 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java @@ -1,6 +1,5 @@ package com.zdjizhi.utils.kafka; -import com.zdjizhi.common.RadiusKnowledgeConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -14,30 +13,40 @@ import java.util.Properties; * @date 2021/6/813:54 */ public class KafkaConsumer { - private static Properties createConsumerConfig() { - Properties properties = new Properties(); - properties.put("bootstrap.servers", RadiusKnowledgeConfig.SOURCE_KAFKA_SERVERS); - properties.put("group.id", RadiusKnowledgeConfig.GROUP_ID); - properties.put("session.timeout.ms", RadiusKnowledgeConfig.SESSION_TIMEOUT_MS); - properties.put("max.poll.records", RadiusKnowledgeConfig.MAX_POLL_RECORDS); - properties.put("max.partition.fetch.bytes", RadiusKnowledgeConfig.MAX_PARTITION_FETCH_BYTES); - properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - - CertUtils.chooseCert(RadiusKnowledgeConfig.SOURCE_KAFKA_SERVERS, properties); - - return properties; - } - - public static FlinkKafkaConsumer<String> getKafkaConsumer() { - FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( - RadiusKnowledgeConfig.SOURCE_KAFKA_TOPIC, - new SimpleStringSchema(), - createConsumerConfig()); - kafkaConsumer.setCommitOffsetsOnCheckpoints(true); - kafkaConsumer.setStartFromGroupOffsets(); + /** + * 官方序列化kafka数据 + * + * @return kafka logs + */ + public static FlinkKafkaConsumer<String> getKafkaConsumer(Properties properties, String topic, String startupMode) { + + setDefaultConfig(properties, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); + setDefaultConfig(properties, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000); + setDefaultConfig(properties, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 31457280); + + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); + + switch (startupMode) { + case "group": + kafkaConsumer.setStartFromGroupOffsets(); + break; + case "latest": + kafkaConsumer.setStartFromLatest(); + break; + case "earliest": + kafkaConsumer.setStartFromEarliest(); + break; + default: + kafkaConsumer.setStartFromGroupOffsets(); + } return kafkaConsumer; } + + private static void setDefaultConfig(Properties properties, String key, Object value) { + if (!properties.contains(key)) { + properties.put(key, value); + } + } } diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java index 3b88909..c7cd3f2 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java @@ -1,9 +1,7 @@ package com.zdjizhi.utils.kafka; -import com.zdjizhi.common.RadiusKnowledgeConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Optional; import java.util.Properties; @@ -16,35 +14,29 @@ import java.util.Properties; */ public class KafkaProducer { - private static Properties createProducerConfig() { - Properties properties = new Properties(); - properties.put("bootstrap.servers", RadiusKnowledgeConfig.SINK_KAFKA_SERVERS); - properties.put("acks", RadiusKnowledgeConfig.PRODUCER_ACK); - properties.put("retries", RadiusKnowledgeConfig.RETRIES); - properties.put("linger.ms", RadiusKnowledgeConfig.LINGER_MS); - properties.put("request.timeout.ms", RadiusKnowledgeConfig.REQUEST_TIMEOUT_MS); - properties.put("batch.size", RadiusKnowledgeConfig.BATCH_SIZE); - properties.put("buffer.memory", RadiusKnowledgeConfig.BUFFER_MEMORY); - properties.put("max.request.size", RadiusKnowledgeConfig.MAX_REQUEST_SIZE); - properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, RadiusKnowledgeConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); - - CertUtils.chooseCert(RadiusKnowledgeConfig.SINK_KAFKA_SERVERS, properties); - - return properties; - } - - - public static FlinkKafkaProducer<String> getKafkaProducer() { - FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( - RadiusKnowledgeConfig.SINK_KAFKA_TOPIC, + public static FlinkKafkaProducer<String> getKafkaProducer(Properties properties, String topic, boolean logFailuresOnly) { + setDefaultConfig(properties, "ack", 1); + setDefaultConfig(properties, "retries", 0); + setDefaultConfig(properties, "linger.ms", 10); + setDefaultConfig(properties, "request.timeout.ms", 30000); + setDefaultConfig(properties, "batch.size", 262144); + setDefaultConfig(properties, "buffer.memory", 134217728); + setDefaultConfig(properties, "max.request.size", 10485760); + setDefaultConfig(properties, "compression.type", "snappy"); + + FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>( + topic, new SimpleStringSchema(), - createProducerConfig(), - //sink与所有分区建立连接,轮询写入; - Optional.empty()); + properties, Optional.empty()); - //允许producer记录失败日志而不是捕获和抛出它们 - kafkaProducer.setLogFailuresOnly(true); + kafkaProducer.setLogFailuresOnly(logFailuresOnly); return kafkaProducer; } + + private static void setDefaultConfig(Properties properties, String key, Object value) { + if (!properties.contains(key)) { + properties.put(key, value); + } + } } diff --git a/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java b/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java deleted file mode 100644 index 58923fe..0000000 --- a/src/main/java/com/zdjizhi/utils/system/RadiusKnowledgeConfigurations.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.zdjizhi.utils.system; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.utils.StringUtil; - -import java.io.IOException; -import java.util.Locale; -import java.util.Properties; - - -/** - * @author Administrator - */ - -public final class RadiusKnowledgeConfigurations { - private static final Log logger = LogFactory.get(); - - - private static Properties propDefault = new Properties(); - private static Properties propService = new Properties(); - - - public static String getStringProperty(Integer type, String key) { - if (type == 0) { - return propService.getProperty(key).trim(); - } else if (type == 1) { - return propDefault.getProperty(key).trim(); - } else { - return null; - } - - } - - public static Integer getIntProperty(Integer type, String key) { - if (type == 0) { - return Integer.parseInt(propService.getProperty(key).trim()); - } else if (type == 1) { - return Integer.parseInt(propDefault.getProperty(key).trim()); - } else { - return null; - } - } - - public static Long getLongProperty(Integer type, String key) { - if (type == 0) { - return Long.parseLong(propService.getProperty(key).trim()); - } else if (type == 1) { - return Long.parseLong(propDefault.getProperty(key).trim()); - } else { - return null; - } - } - - public static Boolean getBooleanProperty(Integer type, String key) { - if (type == 0) { - return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); - } else if (type == 1) { - return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); - } else { - return null; - } - } - - static { - try { - propService.load(RadiusKnowledgeConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); - propDefault.load(RadiusKnowledgeConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); - } catch (IOException | RuntimeException e) { - propDefault = null; - propService = null; - logger.error("Failed to obtain configuration file information,Detailed info:" + e); - } - } -} diff --git a/src/test/java/com/zdjizhi/JsonTest.java b/src/test/java/com/zdjizhi/JsonTest.java new file mode 100644 index 0000000..94a8819 --- /dev/null +++ b/src/test/java/com/zdjizhi/JsonTest.java @@ -0,0 +1,31 @@ +package com.zdjizhi; + +import com.alibaba.fastjson2.JSONPath; +import com.alibaba.fastjson2.JSONReader; +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2023/11/1611:00 + */ +public class JsonTest { + private static Map<String, JSONPath> jsonPathMap = new ConcurrentHashMap<>(16); + + + @Test + public void jsonPathTest(){ + String message = "{\"common_stream_dir\":1,\"common_address_type\":4,\"common_client_ip\":\"192.50.50.146\",\"common_server_ip\":\"192.168.40.190\",\"common_client_port\":43063,\"common_server_port\":1813,\"common_c2s_pkt_num\":1,\"common_s2c_pkt_num\":0,\"common_c2s_byte_num\":54,\"common_s2c_byte_num\":0,\"common_start_time\":1700102711,\"common_end_time\":1700102711,\"common_con_duration_ms\":0,\"common_stream_trace_id\":249261754914099,\"common_l4_protocol\":\"IPv4_UDP\",\"common_address_list\":\"43063-1813-192.50.50.146-192.168.40.190\",\"common_in_src_mac\":\"c6:0a:13:99:60:8d\",\"common_in_dest_mac\":\"48:73:97:96:38:1a\",\"common_policy_id\":0,\"common_service\":162,\"common_sled_ip\":\"192.168.40.161\",\"common_schema_type\":\"RADIUS\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-7400\\\"},{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-7400\\\"}]}\",\"common_vsys_id\":1,\"radius_packet_type\":4,\"radius_account\":\"test0157\",\"radius_acct_status_type\":2,\"radius_acct_session_id\":\"9264\",\"radius_framed_ip\":\"192.168.56.157\",\"radius_event_timestamp\":1665480477}"; + + JSONReader parser = JSONReader.of(message); + + String dataTypeExpr = "[?(@.radius_packet_type = 1)]"; + + JSONPath jsonPath = JSONPath.of(dataTypeExpr); + System.out.println(jsonPath.extract(parser)); + } +} diff --git a/src/test/java/com/zdjizhi/testData b/src/test/java/com/zdjizhi/testData new file mode 100644 index 0000000..904ecd6 --- /dev/null +++ b/src/test/java/com/zdjizhi/testData @@ -0,0 +1 @@ +{"common_stream_dir":1,"common_address_type":4,"common_client_ip":"192.50.50.146","common_server_ip":"192.168.40.190","common_client_port":43063,"common_server_port":1813,"common_c2s_pkt_num":1,"common_s2c_pkt_num":0,"common_c2s_byte_num":54,"common_s2c_byte_num":0,"common_start_time":1700102711,"common_end_time":1700102711,"common_con_duration_ms":0,"common_stream_trace_id":249261754914099,"common_l4_protocol":"IPv4_UDP","common_address_list":"43063-1813-192.50.50.146-192.168.40.190","common_in_src_mac":"c6:0a:13:99:60:8d","common_in_dest_mac":"48:73:97:96:38:1a","common_policy_id":0,"common_service":162,"common_sled_ip":"192.168.40.161","common_schema_type":"RADIUS","common_device_tag":"{\"tags\":[{\"tag\":\"device_group\",\"value\":\"group-xxg-7400\"},{\"tag\":\"data_center\",\"value\":\"center-xxg-7400\"}]}","common_vsys_id":1,"radius_packet_type":4,"radius_account":"test0157","radius_acct_status_type":2,"radius_acct_session_id":"9264","radius_framed_ip":"192.168.56.157","radius_event_timestamp":1665480477} |
