summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorqidaijie <[email protected]>2021-09-27 11:11:56 +0800
committerqidaijie <[email protected]>2021-09-27 11:11:56 +0800
commit4b68261130196464960aedec8c01ede7e17ab54c (patch)
treeaee7fbea05b2859613937aef39f8db55ef7e4b25
parentfcd97b7aabe48c5ce73403059481acb0040b3243 (diff)
更新2109版本
-rw-r--r--pom.xml33
-rw-r--r--properties/default_config.properties23
-rw-r--r--properties/service_flow_config.properties35
-rw-r--r--src/main/java/com/zdjizhi/common/DefaultProConfig.java21
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java33
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java18
-rw-r--r--src/main/java/com/zdjizhi/utils/general/SnowflakeId.java8
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFunction.java19
-rw-r--r--src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java3
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/CertUtils.java36
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/Consumer.java7
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/Producer.java31
-rw-r--r--src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java3
-rw-r--r--src/test/java/com/zdjizhi/KafkaLogSend.java92
-rw-r--r--src/test/java/com/zdjizhi/KafkaTest.java14
15 files changed, 169 insertions, 207 deletions
diff --git a/pom.xml b/pom.xml
index c7ddb06..bad0fba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>log-completion-schema</artifactId>
- <version>20210728</version>
+ <version>210908-security</version>
<name>log-completion-schema</name>
<url>http://www.example.com</url>
@@ -135,13 +135,13 @@
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table</artifactId>
- <version>${flink.version}</version>
- <type>pom</type>
- <scope>${scope.type}</scope>
- </dependency>
+ <!--<dependency>-->
+ <!--<groupId>org.apache.flink</groupId>-->
+ <!--<artifactId>flink-table</artifactId>-->
+ <!--<version>${flink.version}</version>-->
+ <!--<type>pom</type>-->
+ <!--<scope>${scope.type}</scope>-->
+ <!--</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
@@ -155,23 +155,32 @@
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
+ <artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
- <!--<scope>${scope.type}</scope>-->
+ <scope>${scope.type}</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.11</artifactId>
+ <artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<scope>${scope.type}</scope>
</dependency>
+ <!--test-->
+ <!--<dependency>-->
+ <!--<groupId>org.apache.flink</groupId>-->
+ <!--<artifactId>flink-connector-hbase-2.2_2.12</artifactId>-->
+ <!--<version>${flink.version}</version>-->
+ <!--<scope>${scope.type}</scope>-->
+ <!--</dependency>-->
+ <!--test-->
+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.11</artifactId>
+ <artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<!--<scope>${scope.type}</scope>-->
</dependency>
diff --git a/properties/default_config.properties b/properties/default_config.properties
index d82130d..c11eeb7 100644
--- a/properties/default_config.properties
+++ b/properties/default_config.properties
@@ -22,8 +22,29 @@ buffer.memory=134217728
#10M
max.request.size=10485760
+#kafka SASL��֤�û���
+kafka.user=admin
+
+#kafka SASL��SSL��֤����
+kafka.pin=galaxy2019
+
+#kafka source connection timeout
+session.timeout.ms=60000
+
+#kafka source poll
+max.poll.records=3000
+
+#kafka source poll bytes
+max.partition.fetch.bytes=31457280
+
#hbase table name
hbase.table.name=subscriber_info
#�ʼ�Ĭ�ϱ���
-mail.default.charset=UTF-8 \ No newline at end of file
+mail.default.charset=UTF-8
+
+#kafka source protocol; SSL or SASL
+kafka.source.protocol=SASL
+
+#kafka sink protocol; SSL or SASL
+kafka.sink.protocol=SASL \ No newline at end of file
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
index 9bb2f84..362a264 100644
--- a/properties/service_flow_config.properties
+++ b/properties/service_flow_config.properties
@@ -1,37 +1,37 @@
#--------------------------------地址配置------------------------------#
#管理kafka地址
-input.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
+input.kafka.servers=192.168.44.12:9091
#管理输出kafka地址
-output.kafka.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
+output.kafka.servers=192.168.44.12:9091
#zookeeper 地址 用于配置log_id
-zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
+zookeeper.servers=192.168.44.12:2181
#hbase zookeeper地址 用于连接HBase
-hbase.zookeeper.servers=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
+hbase.zookeeper.servers=192.168.44.12:2181
#--------------------------------HTTP/定位库------------------------------#
#定位库地址
-ip.library=/home/bigdata/topology/dat/
+tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\
#网关的schema位置
-schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/connection_record_log
+schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/session_record
#网关APP_ID 获取接口
-app.id.http=http://192.168.44.67:9999/open-api/appDicList
+app.id.http=http://192.168.44.12:9999/open-api/appDicList
#--------------------------------Kafka消费组信息------------------------------#
#kafka 接收数据topic
-input.kafka.topic=CONNECTION-RECORD-LOG
+input.kafka.topic=SESSION-RECORD
#补全数据 输出 topic
-output.kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
+output.kafka.topic=SESSION-RECORD-COMPLETED
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
-group.id=connection-record-flink-20210809
+group.id=session-record-log-20210902-1
#生产者压缩模式 none or snappy
producer.kafka.compression.type=none
@@ -39,22 +39,13 @@ producer.kafka.compression.type=none
#生产者ack
producer.ack=1
-#接收自kafka的消费者 client-id
-consumer.client.id=consumer-connection-record
-
-#回写给kafka的生产者 client-id
-producer.client.id=producer-connection-record
-
#--------------------------------topology配置------------------------------#
#consumer 并行度
-consumer.parallelism=3
-
-#map函数并行度
-map.parallelism=3
+consumer.parallelism=1
-#producer 并行度
-producer.parallelism=3
+#转换函数并行度
+transform.parallelism=1
#数据中心,取值范围(0-63)
data.center.id.num=0
diff --git a/src/main/java/com/zdjizhi/common/DefaultProConfig.java b/src/main/java/com/zdjizhi/common/DefaultProConfig.java
deleted file mode 100644
index b98ea53..0000000
--- a/src/main/java/com/zdjizhi/common/DefaultProConfig.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.zdjizhi.common;
-
-
-import com.zdjizhi.utils.system.FlowWriteConfigurations;
-
-/**
- * @author Administrator
- */
-public class DefaultProConfig {
-
-
- public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
- public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
- public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
- public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
- public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
- public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
- public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
-
-
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index bf82757..aa3c757 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -15,22 +15,29 @@ public class FlowWriteConfig {
public static final String IF_CONDITION_SPLITTER = "=";
public static final String MODEL = "remote";
public static final String PROTOCOL_SPLITTER = "\\.";
+
/**
- * System
+ * System config
*/
public static final Integer CONSUMER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "consumer.parallelism");
- public static final Integer MAP_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "map.parallelism");
- public static final Integer PRODUCER_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "producer.parallelism");
+ public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs");
public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
+ public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
+ /**
+ * kafka source config
+ */
+ public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
/**
- * kafka
+ * kafka sink config
*/
public static final String INPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "input.kafka.servers");
public static final String OUTPUT_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "output.kafka.servers");
@@ -40,14 +47,22 @@ public class FlowWriteConfig {
public static final String OUTPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "output.kafka.topic");
public static final String INPUT_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "input.kafka.topic");
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(0, "producer.ack");
- public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library");
+ public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+ public static final String KAFKA_SOURCE_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.source.protocol");
+ public static final String KAFKA_SINK_PROTOCOL = FlowWriteConfigurations.getStringProperty(1, "kafka.sink.protocol");
+ public static final String KAFKA_USER = FlowWriteConfigurations.getStringProperty(1, "kafka.user");
+ public static final String KAFKA_PIN = FlowWriteConfigurations.getStringProperty(1, "kafka.pin");
/**
- * kafka限流配置-20201117
+ * connection kafka
*/
- public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
- public static final String CONSUMER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "consumer.client.id");
- public static final String PRODUCER_CLIENT_ID = FlowWriteConfigurations.getStringProperty(0, "producer.client.id");
+ public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
/**
* http
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index a9b38ca..5c89522 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -32,22 +32,28 @@ public class LogFlowWriteTopology {
if (FlowWriteConfig.LOG_NEED_COMPLETE == 1) {
//对原始日志进行处理补全转换等
- DataStream<String> cleaningLog = streamSource.map(new MapCompletedFunction())
- .name("TransFormLogs").setParallelism(FlowWriteConfig.MAP_PARALLELISM);
+ DataStream<String> cleaningLog = streamSource.map(new MapCompletedFunction()).name("TransFormLogs")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//过滤空数据不发送到Kafka内
- DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData");
+ DataStream<String> result = cleaningLog.filter(new FilterNullFunction()).name("FilterAbnormalData")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
//发送数据到Kafka
result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
- .setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM);
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
} else {
- DataStream<String> result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData");
- result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(FlowWriteConfig.PRODUCER_PARALLELISM);
+ //过滤空数据不发送到Kafka内
+ DataStream<String> result = streamSource.filter(new FilterNullFunction()).name("FilterOriginalData")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
+ //发送数据到Kafka
+ result.addSink(Producer.getKafkaProducer()).name("LogSinkKafka")
+ .setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
}
try {
environment.execute(args[0]);
} catch (Exception e) {
logger.error("This Flink task start ERROR! Exception information is :" + e);
+ e.printStackTrace();
}
}
diff --git a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
index d203a2b..168fec2 100644
--- a/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
+++ b/src/main/java/com/zdjizhi/utils/general/SnowflakeId.java
@@ -30,12 +30,12 @@ public class SnowflakeId {
/**
* 机器id所占的位数
*/
- private final long workerIdBits = 7L;
+ private final long workerIdBits = 8L;
/**
* 数据标识id所占的位数
*/
- private final long dataCenterIdBits = 6L;
+ private final long dataCenterIdBits = 5L;
/**
* 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
@@ -74,12 +74,12 @@ public class SnowflakeId {
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/**
- * 工作机器ID(0~127)
+ * 工作机器ID(0~255)
*/
private long workerId;
/**
- * 数据中心ID(0~63)
+ * 数据中心ID(0~31)
*/
private long dataCenterId;
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
index 9fada7b..7dc806e 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -6,7 +6,6 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
-import com.zdjizhi.common.DefaultProConfig;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookup;
@@ -34,12 +33,12 @@ class TransFunction {
* IP定位库工具类
*/
private static IpLookup ipLookup = new IpLookup.Builder(false)
- .loadDataFileV4(FlowWriteConfig.IP_LIBRARY + "ip_v4.mmdb")
- .loadDataFileV6(FlowWriteConfig.IP_LIBRARY + "ip_v6.mmdb")
- .loadDataFilePrivateV4(FlowWriteConfig.IP_LIBRARY + "ip_private_v4.mmdb")
- .loadDataFilePrivateV6(FlowWriteConfig.IP_LIBRARY + "ip_private_v6.mmdb")
- .loadAsnDataFile(FlowWriteConfig.IP_LIBRARY + "asn_v4.mmdb")
- .loadAsnDataFileV6(FlowWriteConfig.IP_LIBRARY + "asn_v6.mmdb")
+ .loadDataFileV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_v4.mmdb")
+ .loadDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_v6.mmdb")
+ .loadDataFilePrivateV4(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v4.mmdb")
+ .loadDataFilePrivateV6(FlowWriteConfig.TOOLS_LIBRARY + "ip_private_v6.mmdb")
+ .loadAsnDataFile(FlowWriteConfig.TOOLS_LIBRARY + "asn_v4.mmdb")
+ .loadAsnDataFileV6(FlowWriteConfig.TOOLS_LIBRARY + "asn_v6.mmdb")
.build();
/**
@@ -93,9 +92,9 @@ class TransFunction {
*/
static String radiusMatch(String ip) {
String account = HBaseUtils.getAccount(ip.trim());
- if (StringUtil.isBlank(account)) {
- logger.warn("HashMap get account is null, Ip is :" + ip);
- }
+// if (StringUtil.isBlank(account)) {
+// logger.warn("HashMap get account is null, Ip is :" + ip);
+// }
return account;
}
diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
index 60b3d09..710e4b9 100644
--- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
+++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java
@@ -2,7 +2,6 @@ package com.zdjizhi.utils.hbase;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.DefaultProConfig;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -46,7 +45,7 @@ public class HBaseUtils {
*/
private HBaseUtils() {
zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS;
- hBaseTable = DefaultProConfig.HBASE_TABLE_NAME;
+ hBaseTable = FlowWriteConfig.HBASE_TABLE_NAME;
//获取连接
getConnection();
//拉取所有
diff --git a/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
new file mode 100644
index 0000000..b09eedb
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/kafka/CertUtils.java
@@ -0,0 +1,36 @@
+package com.zdjizhi.utils.kafka;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/9/610:37
+ */
+class CertUtils {
+ static void chooseCert(String type, Properties properties) {
+ switch (type) {
+ case "SSL":
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_PIN);
+ properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_PIN);
+ properties.put("ssl.key.password", FlowWriteConfig.KAFKA_PIN);
+ break;
+ case "SASL":
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + FlowWriteConfig.KAFKA_USER + " password=" + FlowWriteConfig.KAFKA_PIN + ";");
+ break;
+ default:
+ }
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
index c220064..1036fe9 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java
@@ -4,6 +4,7 @@ import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
@@ -25,10 +26,8 @@ public class Consumer {
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- /*
- * kafka限流配置-20201117
- */
-// properties.put(ConsumerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.CONSUMER_CLIENT_ID);
+ CertUtils.chooseCert(FlowWriteConfig.KAFKA_SOURCE_PROTOCOL,properties);
+
return properties;
}
diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
index 077ae71..e1a5b22 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java
@@ -1,12 +1,12 @@
package com.zdjizhi.utils.kafka;
-import com.zdjizhi.common.DefaultProConfig;
import com.zdjizhi.common.FlowWriteConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -20,21 +20,17 @@ public class Producer {
private static Properties createProducerConfig() {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.OUTPUT_KAFKA_SERVERS);
-// properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-// properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
- properties.put("retries", DefaultProConfig.RETRIES);
- properties.put("linger.ms", DefaultProConfig.LINGER_MS);
- properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
- properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
- properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
- properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
-
- /**
- * kafka限流配置-20201117
- */
-// properties.put(ProducerConfig.CLIENT_ID_CONFIG, FlowWriteConfig.PRODUCER_CLIENT_ID);
-// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+ properties.put("retries", FlowWriteConfig.RETRIES);
+ properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
+ properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
+ properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
+ properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
+ CertUtils.chooseCert(FlowWriteConfig.KAFKA_SINK_PROTOCOL, properties);
+
return properties;
}
@@ -43,9 +39,10 @@ public class Producer {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
FlowWriteConfig.OUTPUT_KAFKA_TOPIC,
new SimpleStringSchema(),
- createProducerConfig());
+ createProducerConfig(), Optional.empty());
kafkaProducer.setLogFailuresOnly(false);
+
// kafkaProducer.setWriteTimestampToKafka(true);
return kafkaProducer;
diff --git a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
index ebf4368..9efbd46 100644
--- a/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
+++ b/src/main/java/com/zdjizhi/utils/zookeeper/ZookeeperUtils.java
@@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch;
*/
public class ZookeeperUtils implements Watcher {
private static final Log logger = LogFactory.get();
+ private static final int ID_MAX = 255;
private ZooKeeper zookeeper;
@@ -46,7 +47,7 @@ public class ZookeeperUtils implements Watcher {
connectZookeeper(zookeeperIp);
Stat stat = zookeeper.exists(path, true);
workerId = Integer.parseInt(getNodeDate(path));
- if (workerId > 63) {
+ if (workerId > ID_MAX) {
workerId = 0;
zookeeper.setData(path, "1".getBytes(), stat.getVersion());
} else {
diff --git a/src/test/java/com/zdjizhi/KafkaLogSend.java b/src/test/java/com/zdjizhi/KafkaLogSend.java
deleted file mode 100644
index 5c3feb3..0000000
--- a/src/test/java/com/zdjizhi/KafkaLogSend.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package com.zdjizhi;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.DefaultProConfig;
-import org.apache.kafka.clients.producer.*;
-
-import java.util.Properties;
-
-/**
- * NTC系统配置产生日志写入数据中心类
- *
- * @author Administrator
- * @create 2018-08-13 15:11
- */
-
-public class KafkaLogSend {
- private static final Log logger = LogFactory.get();
-
- /**
- * kafka生产者,用于向kafka中发送消息
- */
- private static org.apache.kafka.clients.producer.Producer<String, String> kafkaProducer;
-
- /**
- * kafka生产者适配器(单例),用来代理kafka生产者发送消息
- */
- private static KafkaLogSend kafkaLogSend;
-
- private KafkaLogSend() {
- initKafkaProducer();
- }
-
- public static KafkaLogSend getInstance() {
- if (kafkaLogSend == null) {
- kafkaLogSend = new KafkaLogSend();
- }
- return kafkaLogSend;
- }
-
-
- public void sendMessage(String message) {
-// for (String value : list) {
- kafkaProducer.send(new ProducerRecord<>("test", message), new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null) {
- logger.error("写入test出现异常", exception);
- }
- }
- });
-// }
-// kafkaProducer.flush();
- logger.debug("Log sent to National Center successfully!!!!!");
- }
-
- /**
- * 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次
- */
- private void initKafkaProducer() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093");
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("acks", "1");
- properties.put("retries", DefaultProConfig.RETRIES);
- properties.put("linger.ms", DefaultProConfig.LINGER_MS);
- properties.put("request.timeout.ms", DefaultProConfig.REQUEST_TIMEOUT_MS);
- properties.put("batch.size", DefaultProConfig.BATCH_SIZE);
- properties.put("buffer.memory", DefaultProConfig.BUFFER_MEMORY);
- properties.put("max.request.size", DefaultProConfig.MAX_REQUEST_SIZE);
-
- properties.put("security.protocol", "SSL");
- properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks");
- properties.put("ssl.keystore.password", "ceiec2019");
- properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks");
- properties.put("ssl.truststore.password", "ceiec2019");
- properties.put("ssl.key.password", "ceiec2019");
-
-
- /*
- * kafka限流配置-20201117
- */
-// properties.put(ProducerConfig.CLIENT_ID_CONFIG, VoipRelationConfig.PRODUCER_CLIENT_ID);
-// properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, VoipRelationConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
-
-
- kafkaProducer = new KafkaProducer<>(properties);
- }
-
-
-}
diff --git a/src/test/java/com/zdjizhi/KafkaTest.java b/src/test/java/com/zdjizhi/KafkaTest.java
index 3bb6d1c..4b034a3 100644
--- a/src/test/java/com/zdjizhi/KafkaTest.java
+++ b/src/test/java/com/zdjizhi/KafkaTest.java
@@ -3,6 +3,7 @@ package com.zdjizhi;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
@@ -17,7 +18,7 @@ public class KafkaTest {
public static void main(String[] args) {
Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.44.33:9093,192.168.44.34:9093,192.168.44.35:9093");
+ properties.put("bootstrap.servers", "192.168.44.12:9091");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
@@ -30,12 +31,13 @@ public class KafkaTest {
properties.put("security.protocol", "SSL");
// properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.keystore.jks");
- properties.put("ssl.keystore.location", "/usr/ca/client/client.keystore.jks");
- properties.put("ssl.keystore.password", "ceiec2019");
+ properties.put("ssl.keystore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\keystore.jks");
+ properties.put("ssl.keystore.password", "galaxy2019");
// properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\kafka\\client.truststore.jks");
- properties.put("ssl.truststore.location", "/usr/ca/trust/client.truststore.jks");
- properties.put("ssl.truststore.password", "ceiec2019");
- properties.put("ssl.key.password", "ceiec2019");
+ properties.put("ssl.truststore.location", "D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\truststore.jks");
+ properties.put("ssl.truststore.password", "galaxy2019");
+ properties.put("ssl.key.password", "galaxy2019");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
Producer<String, String> producer = new KafkaProducer<String, String>(properties);