summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/galaxy/tsg/Toptask.java333
-rw-r--r--src/main/java/com/galaxy/tsg/config/CommonConfig.java53
-rw-r--r--src/main/java/com/galaxy/tsg/config/CommonConfigurations.java (renamed from src/main/java/com/galaxy/tsg/config/commonConfigurations.java)4
-rw-r--r--src/main/java/com/galaxy/tsg/config/commonConfig.java52
-rw-r--r--src/main/java/com/galaxy/tsg/function/FlatMapFunction.java89
-rw-r--r--src/main/java/com/galaxy/tsg/function/MetricsAggregationReduce.java (renamed from src/main/java/com/galaxy/tsg/function/metricsAggregationReduce.java)6
-rw-r--r--src/main/java/com/galaxy/tsg/function/MetricsCalculate.java (renamed from src/main/java/com/galaxy/tsg/function/metricsCalculate.java)28
-rw-r--r--src/main/java/com/galaxy/tsg/function/TopnHotItems.java (renamed from src/main/java/com/galaxy/tsg/function/topnHotItems.java)36
-rw-r--r--src/main/java/com/galaxy/tsg/pojo/ByteResultEntity.java (renamed from src/main/java/com/galaxy/tsg/pojo/byteResultEntity.java)4
-rw-r--r--src/main/java/com/galaxy/tsg/pojo/MetricsEntity.java (renamed from src/main/java/com/galaxy/tsg/pojo/metricsEntity.java)2
-rw-r--r--src/main/java/com/galaxy/tsg/pojo/PacketResultEntity.java (renamed from src/main/java/com/galaxy/tsg/pojo/packetResultEntity.java)4
-rw-r--r--src/main/java/com/galaxy/tsg/pojo/ResultEntity.java56
-rw-r--r--src/main/java/com/galaxy/tsg/pojo/SessionEntity.java200
-rw-r--r--src/main/java/com/galaxy/tsg/pojo/SessionResultEntity.java (renamed from src/main/java/com/galaxy/tsg/pojo/sessionResultEntity.java)4
-rw-r--r--src/main/java/com/galaxy/tsg/pojo/TransformEntity.java (renamed from src/main/java/com/galaxy/tsg/pojo/transformEntity.java)57
-rw-r--r--src/main/java/com/galaxy/tsg/pojo/resultEntity.java56
-rw-r--r--src/main/java/com/galaxy/tsg/pojo/sessionEntity.java204
-rw-r--r--src/main/java/com/galaxy/tsg/selector/GroupBySelector.java48
-rw-r--r--src/main/java/com/galaxy/tsg/selector/OrderBySelector.java13
-rw-r--r--src/main/java/com/galaxy/tsg/util/KafkaUtils.java114
-rw-r--r--src/main/java/com/galaxy/tsg/util/TimestampDeserializationSchema.java48
-rw-r--r--src/main/java/com/galaxy/tsg/util/kafkaUtils.java109
22 files changed, 797 insertions, 723 deletions
diff --git a/src/main/java/com/galaxy/tsg/Toptask.java b/src/main/java/com/galaxy/tsg/Toptask.java
index a9cb6d2..6926c3c 100644
--- a/src/main/java/com/galaxy/tsg/Toptask.java
+++ b/src/main/java/com/galaxy/tsg/Toptask.java
@@ -1,19 +1,18 @@
package com.galaxy.tsg;
-import com.alibaba.fastjson2.JSON;
-import com.galaxy.tsg.function.metricsAggregationReduce;
-import com.galaxy.tsg.function.metricsCalculate;
-import com.galaxy.tsg.function.topnHotItems;
-import com.galaxy.tsg.pojo.resultEntity;
-import com.galaxy.tsg.pojo.sessionEntity;
-import com.galaxy.tsg.pojo.transformEntity;
-import com.zdjizhi.utils.StringUtil;
+import com.galaxy.tsg.config.CommonConfig;
+import com.galaxy.tsg.function.*;
+import com.galaxy.tsg.pojo.ResultEntity;
+import com.galaxy.tsg.pojo.SessionEntity;
+import com.galaxy.tsg.pojo.TransformEntity;
+import com.galaxy.tsg.selector.GroupBySelector;
+import com.galaxy.tsg.selector.OrderBySelector;
+import com.geedgenetworks.utils.StringUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -23,249 +22,125 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
+import java.util.Objects;
-import static com.galaxy.tsg.config.commonConfig.*;
-import static com.galaxy.tsg.util.kafkaUtils.getKafkaConsumer;
-import static com.galaxy.tsg.util.kafkaUtils.getKafkaSink;
+import static com.galaxy.tsg.config.CommonConfig.*;
+import static com.galaxy.tsg.util.KafkaUtils.getKafkaConsumer;
+import static com.galaxy.tsg.util.KafkaUtils.getKafkaSink;
public class Toptask {
private static final Logger LOG = LoggerFactory.getLogger(Toptask.class);
public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ ParameterTool serviceConfig = ParameterTool.fromPropertiesFile(args[0]);
+ Configuration configurationService = serviceConfig.getConfiguration();
+ // global register
+ env.getConfig().setGlobalJobParameters(configurationService);
+ WatermarkStrategy<SessionEntity> strategyForSession = WatermarkStrategy
+ .<SessionEntity>forBoundedOutOfOrderness(Duration.ofSeconds(configurationService.get(CommonConfig.WATERMARK_TIME)))
+ .withTimestampAssigner((sessionEntity, timestamp) -> sessionEntity.getRecv_time());
- DataStream<String> sourceForSession = env.addSource(getKafkaConsumer(KAFKA_CONSUMER_TOPIC)).setParallelism(KAFKA_CONSUMER_PARALLELISM);
- WatermarkStrategy<transformEntity> strategyForSession = WatermarkStrategy
- .<transformEntity>forBoundedOutOfOrderness(Duration.ofSeconds(WATERMARK_TIME))
- .withTimestampAssigner((transformEntity, timestamp) -> transformEntity.getTimestamp()*1000);
-
-
- SingleOutputStreamOperator<transformEntity> inputForSession = sourceForSession.map(new MapFunction<String, transformEntity>() {
+ DataStream<SessionEntity> sourceForSession = env.addSource(getKafkaConsumer(configurationService)).setParallelism(configurationService.get(CommonConfig.KAFKA_CONSUMER_PARALLELISM)).assignTimestampsAndWatermarks(strategyForSession);;
+ SingleOutputStreamOperator<TransformEntity> inputForSession = sourceForSession.flatMap(new FlatMapFunction()
+ ).filter(new FilterFunction<TransformEntity>() {
@Override
- public transformEntity map(String message) {
- transformEntity transformEntity = new transformEntity();
- try {
- sessionEntity sessionEntity = JSON.parseObject(message, com.galaxy.tsg.pojo.sessionEntity.class);
- transformEntity.setServer_ip(sessionEntity.getCommon_server_ip());
- transformEntity.setClient_ip(sessionEntity.getCommon_client_ip());
-
- transformEntity.setSubscriber_id(sessionEntity.getCommon_subscriber_id());
- transformEntity.setFqdn(sessionEntity.getCommon_server_fqdn());
- transformEntity.setExternal_ip(sessionEntity.getCommon_external_ip());
- transformEntity.setInternal_ip(sessionEntity.getCommon_internal_ip());
- transformEntity.setDomain(sessionEntity.getHttp_domain());
- transformEntity.setDevice_group(sessionEntity.getCommon_device_group());
- transformEntity.setDevice_id(sessionEntity.getCommon_device_id());
- transformEntity.setData_center(sessionEntity.getCommon_data_center());
- transformEntity.setVsys_id(sessionEntity.getCommon_vsys_id());
- transformEntity.setTimestamp(sessionEntity.getCommon_recv_time());
- transformEntity.setSessions(sessionEntity.getCommon_sessions());
-
- transformEntity.setL4_protocol(sessionEntity.getCommon_l4_protocol());
-
-
- if ((8L & sessionEntity.getCommon_flags()) == 8L) {
-
- transformEntity.setOut_bytes(sessionEntity.getCommon_c2s_byte_num());
- transformEntity.setOut_pkts(sessionEntity.getCommon_c2s_pkt_num());
- transformEntity.setIn_bytes(sessionEntity.getCommon_s2c_byte_num());
- transformEntity.setIn_pkts(sessionEntity.getCommon_s2c_pkt_num());
-
- } else {
- transformEntity.setOut_bytes(sessionEntity.getCommon_s2c_byte_num());
- transformEntity.setOut_pkts(sessionEntity.getCommon_s2c_pkt_num());
- transformEntity.setIn_bytes(sessionEntity.getCommon_c2s_byte_num());
- transformEntity.setIn_pkts(sessionEntity.getCommon_c2s_pkt_num());
-
+ public boolean filter(TransformEntity entity) throws Exception {
- }
-
-
- } catch (Exception e) {
- LOG.error("Entity Parsing ERROR");
- transformEntity.setIfError(1);
- }
- return transformEntity;
- }
- }).filter(new FilterFunction<transformEntity>() {
- @Override
- public boolean filter(transformEntity entity) throws Exception {
-
- return entity.ifError != 1;
+ return entity.getIfError() != 1;
}
});
-
- //clientip聚合TOP
-
- SingleOutputStreamOperator<transformEntity> clientipdStream = inputForSession.filter(new FilterFunction<transformEntity>() {
- @Override
- public boolean filter(transformEntity value) throws Exception {
- return "IPv6_TCP".equals(value.getL4_protocol()) || "IPv4_TCP".equals(value.getL4_protocol());
- }
- }).assignTimestampsAndWatermarks(strategyForSession);
-
- SingleOutputStreamOperator<resultEntity> windowedStream = clientipdStream.keyBy(new groupBySelector("client_ip"))
- .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
- .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("client_ip");;
- DataStream<String> Stream = windowedStream.keyBy(new oneKeySelector())
- .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
- Stream.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
-
-
- //serverip聚合TOP
-
-
- SingleOutputStreamOperator<transformEntity> serveripdStream = inputForSession.filter(new FilterFunction<transformEntity>() {
- @Override
- public boolean filter(transformEntity value) throws Exception {
- return "IPv6_TCP".equals(value.getL4_protocol()) || "IPv4_TCP".equals(value.getL4_protocol());
- }
- }).assignTimestampsAndWatermarks(strategyForSession);
-
- SingleOutputStreamOperator<resultEntity> windowedStreamForServerIp = serveripdStream.keyBy(new groupBySelector("server_ip"))
- .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
- .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_ip");;
- DataStream<String> StreamForServerIp = windowedStreamForServerIp.keyBy(new oneKeySelector())
- .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
- StreamForServerIp.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
-
-
- //common_internal_ip聚合TOP
- SingleOutputStreamOperator<transformEntity> internalStream = inputForSession.filter(new FilterFunction<transformEntity>() {
- @Override
- public boolean filter(transformEntity value) throws Exception {
- return StringUtil.isNotEmpty(value.getInternal_ip());
- }
- }).assignTimestampsAndWatermarks(strategyForSession);
-
- SingleOutputStreamOperator<resultEntity> windowedStreamForInternal = internalStream.keyBy(new groupBySelector("internal_ip"))
- .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
- .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("internal_ip");;
- DataStream<String> StreamForInternal = windowedStreamForInternal.keyBy(new oneKeySelector())
- .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
- StreamForInternal.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
-
- //common_external_ip聚合TOP
-
- SingleOutputStreamOperator<transformEntity> externalStream = inputForSession.filter(new FilterFunction<transformEntity>() {
- @Override
- public boolean filter(transformEntity value) throws Exception {
- return StringUtil.isNotEmpty(value.getExternal_ip());
- }
- }).assignTimestampsAndWatermarks(strategyForSession);
-
- SingleOutputStreamOperator<resultEntity> windowedStreamForExternal = externalStream.keyBy(new groupBySelector("external_ip"))
- .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
- .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("external_ip");;
- DataStream<String> StreamForExternal = windowedStreamForExternal.keyBy(new oneKeySelector())
- .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
- StreamForExternal.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
-
- //http_domain聚合TOP
-
- SingleOutputStreamOperator<transformEntity> domainStream = inputForSession.filter(new FilterFunction<transformEntity>() {
- @Override
- public boolean filter(transformEntity value) throws Exception {
- return StringUtil.isNotEmpty(value.getDomain());
- }
- }).assignTimestampsAndWatermarks(strategyForSession);
-
- SingleOutputStreamOperator<resultEntity> windowedStreamForDomain = domainStream.keyBy(new groupBySelector("server_domain"))
- .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
- .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_domain");;
- DataStream<String> StreamForDomain = windowedStreamForDomain.keyBy(new oneKeySelector())
- .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
- StreamForDomain.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
-
- SingleOutputStreamOperator<transformEntity> userStream = inputForSession.filter(new FilterFunction<transformEntity>() {
- @Override
- public boolean filter(transformEntity value) throws Exception {
- return StringUtil.isNotEmpty(value.getSubscriber_id());
- }
- }).assignTimestampsAndWatermarks(strategyForSession);
-
- //common_subscriber_id聚合TOP
- SingleOutputStreamOperator<resultEntity> windowedStreamForUser = userStream.keyBy(new groupBySelector("subscriber_id"))
- .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
- .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("subscriber_id");;
- DataStream<String> StreamForUser = windowedStreamForUser.keyBy(new oneKeySelector())
- .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
- StreamForUser.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
-
- SingleOutputStreamOperator<transformEntity> fqdnStream = inputForSession.filter(new FilterFunction<transformEntity>() {
- @Override
- public boolean filter(transformEntity value) throws Exception {
- return StringUtil.isNotEmpty(value.getFqdn());
- }
- }).assignTimestampsAndWatermarks(strategyForSession);
-
- SingleOutputStreamOperator<resultEntity> windowedStreamForFqdn = fqdnStream.keyBy(new groupBySelector("server_fqdn"))
- .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE)))
- .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_fqdn");
- DataStream<String> StreamForFqdn = windowedStreamForFqdn.keyBy(new oneKeySelector())
- .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM);
- StreamForFqdn.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM);
-
- env.execute(JOB_NAME);
+ buildJobTuple(configurationService,inputForSession,"client_ip");
+ buildJobTuple(configurationService,inputForSession,"server_ip");
+ buildJobTuple(configurationService,inputForSession,"internal_ip");
+ buildJobTuple(configurationService,inputForSession,"external_ip");
+ buildJobTuple(configurationService,inputForSession,"server_domain");
+ buildJobTuple(configurationService,inputForSession,"subscriber_id");
+ buildJobTuple(configurationService,inputForSession,"server_fqdn");
+ env.execute(configurationService.getString(JOB_NAME));
}
- public static class groupBySelector implements KeySelector<transformEntity, Tuple5<String, Long, String, String, String>> {
- public String key;
+ private static void buildJobTuple(Configuration configurationService, SingleOutputStreamOperator<TransformEntity> inputForSession, String key) {
- public groupBySelector(String key) {
- this.key = key;
- }
-
- @Override
- public Tuple5<String, Long, String, String, String> getKey(transformEntity transformEntity) throws Exception {
-
- Tuple5<String, Long, String, String, String> tuple = null;
- transformEntity.setKey_by(key);
- switch (key) {
- case "client_ip":
- tuple = new Tuple5<>(transformEntity.getClient_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
- break;
- case "server_ip":
- tuple = new Tuple5<>(transformEntity.getServer_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
- break;
- case "internal_ip":
- tuple = new Tuple5<>(transformEntity.getInternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
- break;
- case "external_ip":
- tuple = new Tuple5<>(transformEntity.getExternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
- break;
- case "server_domain":
- tuple = new Tuple5<>(transformEntity.getDomain(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
- break;
+ SingleOutputStreamOperator<TransformEntity> event = null;
+ switch (key) {
+ case "client_ip":
+ event= inputForSession.filter(new FilterFunction<TransformEntity>() {
+ @Override
+ public boolean filter(TransformEntity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getClient_ip());
+ }
+ });
+ break;
+ case "server_ip":
+ event= inputForSession.filter(new FilterFunction<TransformEntity>() {
+ @Override
+ public boolean filter(TransformEntity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getServer_ip());
+ }
+ });
+ break;
+ case "internal_ip":
+ event= inputForSession.filter(new FilterFunction<TransformEntity>() {
+ @Override
+ public boolean filter(TransformEntity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getInternal_ip());
+ }
+ });
+ break;
+ case "external_ip":
+ event= inputForSession.filter(new FilterFunction<TransformEntity>() {
+ @Override
+ public boolean filter(TransformEntity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getExternal_ip());
+ }
+ });
+ break;
+ case "server_domain":
+ event= inputForSession.filter(new FilterFunction<TransformEntity>() {
+ @Override
+ public boolean filter(TransformEntity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getDomain());
+ }
+ });
+ break;
+ case "subscriber_id":
+ event= inputForSession.filter(new FilterFunction<TransformEntity>() {
+ @Override
+ public boolean filter(TransformEntity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getSubscriber_id());
+ }
+ });
+ break;
+ case "server_fqdn":
+ event= inputForSession.filter(new FilterFunction<TransformEntity>() {
+ @Override
+ public boolean filter(TransformEntity value) throws Exception {
+ return StringUtil.isNotEmpty(value.getFqdn());
+ }
+ });
+ break;
- case "subscriber_id":
- tuple = new Tuple5<>(transformEntity.getSubscriber_id(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
- break;
- case "server_fqdn":
- tuple = new Tuple5<>(transformEntity.getFqdn(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
- break;
+ default:
+ break;
+ }
+ SingleOutputStreamOperator<ResultEntity> windowedStreamForFqdn = Objects.requireNonNull(event).keyBy(new GroupBySelector(key))
+ .window(TumblingEventTimeWindows.of(Time.minutes(configurationService.getInteger(CommonConfig.WINDOW_TIME_MINUTE))))
+ .reduce(new MetricsAggregationReduce(), new MetricsCalculate()).setParallelism(configurationService.getInteger(CommonConfig.TASK_PARALLELISM)).name(key);
+ DataStream<String> StreamForKey = windowedStreamForFqdn.keyBy(new OrderBySelector())
+ .process(new TopnHotItems(configurationService.getInteger(CommonConfig.TOP_LIMIT))).setParallelism(configurationService.getInteger(CommonConfig.ORDERBY_PARALLELISM));
+ StreamForKey.addSink(getKafkaSink(configurationService)).setParallelism(configurationService.getInteger(CommonConfig.SINK_PARALLELISM));
- default:
- }
- return tuple;
}
- }
-
- public static class oneKeySelector implements KeySelector<resultEntity, Tuple1<String>> {
- @Override
- public Tuple1<String> getKey(resultEntity entity) throws Exception {
- return new Tuple1<>(entity.getOrder_by());
- }
- }
} \ No newline at end of file
diff --git a/src/main/java/com/galaxy/tsg/config/CommonConfig.java b/src/main/java/com/galaxy/tsg/config/CommonConfig.java
new file mode 100644
index 0000000..db1896a
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/config/CommonConfig.java
@@ -0,0 +1,53 @@
+package com.galaxy.tsg.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Created by wk on 2021/1/6.
+ */
+public class CommonConfig {
+
+ public static final ConfigOption<String> KAFKA_CONSUMER_BROKER = ConfigOptions.key("kafka.consumer.broker").stringType().defaultValue("");
+ public static final ConfigOption<String> KAFKA_CONSUMER_GROUP_ID = ConfigOptions.key("kafka.consumer.group.id").stringType().defaultValue("");
+ public static final ConfigOption<String> KAFKA_CONSUMER_TOPIC = ConfigOptions.key("kafka.consumer.topic").stringType().defaultValue("");
+ public static final ConfigOption<Integer> KAFKA_CONSUMER_PARALLELISM = ConfigOptions.key("kafka.consumer.parallelism").intType().defaultValue(0);
+ public static final ConfigOption<String> KAFKA_CONSUMER_SESSION_TIMEOUT_MS= ConfigOptions.key("kafka.consumer.session.timeout.ms").stringType().defaultValue("60000");
+ public static final ConfigOption<String> KAFKA_CONSUMER_MAX_POLL_RECORD= ConfigOptions.key("kafka.consumer.max.poll.records").stringType().defaultValue("3000");
+ public static final ConfigOption<String> KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES= ConfigOptions.key("kafka.consumer.max.partition.fetch.bytes").stringType().defaultValue("31457280");
+
+
+
+ public static final ConfigOption<String> KAFKA_PRODUCER_TOPIC = ConfigOptions.key("kafka.producer.topic").stringType().defaultValue("");
+ public static final ConfigOption<String> KAFKA_PRODUCER_RETRIES = ConfigOptions.key("kafka.producer.retries").stringType().defaultValue("1");
+ public static final ConfigOption<String> KAFKA_PRODUCER_LINGER_MS = ConfigOptions.key("kafka.producer.linger.ms").stringType().defaultValue("10");
+ public static final ConfigOption<String> KAFKA_PRODUCER_REQUEST_TIMEOUT_MS = ConfigOptions.key("kafka.producer.request.timeout.ms").stringType().defaultValue("30000");
+ public static final ConfigOption<String> KAFKA_PRODUCER_BATCH_SIZE = ConfigOptions.key("kafka.producer.batch.size").stringType().defaultValue("262144");
+ public static final ConfigOption<String> KAFKA_PRODUCER_BUFFER_MEMORY = ConfigOptions.key("kafka.producer.buffer.memory").stringType().defaultValue("134217728");
+ public static final ConfigOption<String> KAFKA_PRODUCER_MAX_REQUEST_SIZE = ConfigOptions.key("kafka.producer.max.request.size").stringType().defaultValue("10485760");
+ public static final ConfigOption<String> KAFKA_PRODUCER_COMPRESSION_TYPE = ConfigOptions.key("kafka.producer.compression.type").stringType().defaultValue("none");
+ public static final ConfigOption<String> KAFKA_PRODUCER_BROKER = ConfigOptions.key("kafka.producer.broker").stringType().defaultValue("");
+
+
+ public static final ConfigOption<String> JOB_NAME = ConfigOptions.key("job.name").stringType().defaultValue("");
+ public static final ConfigOption<Integer> TASK_PARALLELISM = ConfigOptions.key("task.parallelism").intType().defaultValue(0);
+ public static final ConfigOption<Integer> ORDERBY_PARALLELISM = ConfigOptions.key("orderby.parallelism").intType().defaultValue(0);
+ public static final ConfigOption<Integer> SINK_PARALLELISM = ConfigOptions.key("sink.parallelism").intType().defaultValue(0);
+
+ public static final ConfigOption<Integer> WATERMARK_TIME = ConfigOptions.key("watermark.time").intType().defaultValue(0);
+ public static final ConfigOption<Integer> WINDOW_TIME_MINUTE = ConfigOptions.key("window.time.minute").intType().defaultValue(0);
+ public static final ConfigOption<Integer> TOP_LIMIT = ConfigOptions.key("top.limit").intType().defaultValue(0);
+
+ public static final ConfigOption<String> KAFKA_CONSUMER_USER = ConfigOptions.key("kafka.consumer.user").stringType().defaultValue("");
+ public static final ConfigOption<String> KAFKA_CONSUMER_PIN = ConfigOptions.key("kafka.consumer.pin").stringType().defaultValue("");
+ public static final ConfigOption<Integer> KAFKA_CONSUMER_SECURITY = ConfigOptions.key("kafka.consumer.security").intType().defaultValue(0);
+ public static final ConfigOption<String> TOOLS_CONSUMER_LIBRARY = ConfigOptions.key("tools.consumer.library").stringType().defaultValue("");
+
+ public static final ConfigOption<String> KAFKA_PRODUCER_USER = ConfigOptions.key("kafka.producer.user").stringType().defaultValue("");
+ public static final ConfigOption<String> KAFKA_PRODUCER_PIN = ConfigOptions.key("kafka.producer.pin").stringType().defaultValue("");
+ public static final ConfigOption<Integer> KAFKA_PRODUCER_SECURITY = ConfigOptions.key("kafka.producer.security").intType().defaultValue(0);
+ public static final ConfigOption<String> TOOLS_PRODUCER_LIBRARY = ConfigOptions.key("tools.producer.library").stringType().defaultValue("");
+
+
+
+}
diff --git a/src/main/java/com/galaxy/tsg/config/commonConfigurations.java b/src/main/java/com/galaxy/tsg/config/CommonConfigurations.java
index 1da5d7f..447724f 100644
--- a/src/main/java/com/galaxy/tsg/config/commonConfigurations.java
+++ b/src/main/java/com/galaxy/tsg/config/CommonConfigurations.java
@@ -9,7 +9,7 @@ import java.util.Properties;
* @author Administrator
*/
-public final class commonConfigurations {
+public final class CommonConfigurations {
private static Properties propService = new Properties();
@@ -54,7 +54,7 @@ public final class commonConfigurations {
static {
try {
- propService.load(commonConfigurations.class.getClassLoader().getResourceAsStream("common.properties"));
+ propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties"));
} catch (Exception e) {
propService = null;
}
diff --git a/src/main/java/com/galaxy/tsg/config/commonConfig.java b/src/main/java/com/galaxy/tsg/config/commonConfig.java
deleted file mode 100644
index 2136a62..0000000
--- a/src/main/java/com/galaxy/tsg/config/commonConfig.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.galaxy.tsg.config;
-
-/**
- * Created by wk on 2021/1/6.
- */
-public class commonConfig {
-
-
- public static final String KAFKA_CONSUMER_BROKER = commonConfigurations.getStringProperty("kafka.consumer.broker");
- public static final String KAFKA_CONSUMER_GROUP_ID = commonConfigurations.getStringProperty("kafka.consumer.group.id");
- public static final String KAFKA_CONSUMER_TOPIC = commonConfigurations.getStringProperty("kafka.consumer.topic");
- public static final int KAFKA_CONSUMER_PARALLELISM = commonConfigurations.getIntProperty("kafka.consumer.parallelism");
- public static final String KAFKA_CONSUMER_SESSION_TIMEOUT_MS= commonConfigurations.getStringProperty("kafka.consumer.session.timeout.ms");
- public static final String KAFKA_CONSUMER_MAX_POLL_RECORD= commonConfigurations.getStringProperty("kafka.consumer.max.poll.records");
- public static final String KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES= commonConfigurations.getStringProperty("kafka.consumer.max.partition.fetch.bytes");
-
-
-
- public static final String KAFKA_PRODUCER_TOPIC = commonConfigurations.getStringProperty("kafka.producer.topic");
- public static final String KAFKA_PRODUCER_RETRIES = commonConfigurations.getStringProperty("kafka.producer.retries");
- public static final String KAFKA_PRODUCER_LINGER_MS = commonConfigurations.getStringProperty("kafka.producer.linger.ms");
- public static final String KAFKA_PRODUCER_REQUEST_TIMEOUT_MS = commonConfigurations.getStringProperty("kafka.producer.request.timeout.ms");
- public static final String KAFKA_PRODUCER_BATCH_SIZE = commonConfigurations.getStringProperty("kafka.producer.batch.size");
- public static final String KAFKA_PRODUCER_BUFFER_MEMORY = commonConfigurations.getStringProperty("kafka.producer.buffer.memory");
- public static final String KAFKA_PRODUCER_MAX_REQUEST_SIZE = commonConfigurations.getStringProperty("kafka.producer.max.request.size");
- public static final String KAFKA_PRODUCER_COMPRESSION_TYPE = commonConfigurations.getStringProperty("kafka.producer.compression.type");
- public static final String KAFKA_PRODUCER_BROKER = commonConfigurations.getStringProperty("kafka_producer_broker");
-
-
-
- public static final String JOB_NAME = commonConfigurations.getStringProperty("job.name");
- public static final int TASK_PARALLELISM = commonConfigurations.getIntProperty("task.parallelism");
- public static final int ORDERBY_PARALLELISM = commonConfigurations.getIntProperty("orderby.parallelism");
- public static final int SINK_PARALLELISM = commonConfigurations.getIntProperty("sink.parallelism");
-
- public static final int WATERMARK_TIME = commonConfigurations.getIntProperty("watermark.time");
- public static final int WINDOW_TIME_MINUTE = commonConfigurations.getIntProperty("window.time.minute");
- public static final int TOP_LIMIT = commonConfigurations.getIntProperty("top.limit");
-
- public static final String KAFKA_CONSUMER_USER = commonConfigurations.getStringProperty("kafka.consumer.user");
- public static final String KAFKA_CONSUMER_PIN = commonConfigurations.getStringProperty("kafka.consumer.pin");
- public static final int KAFKA_CONSUMER_SECURITY = commonConfigurations.getIntProperty("kafka.consumer.security");
- public static final String TOOLS_CONSUMER_LIBRARY = commonConfigurations.getStringProperty("tools.consumer.library");
-
- public static final String KAFKA_PRODUCER_USER = commonConfigurations.getStringProperty("kafka.producer.user");
- public static final String KAFKA_PRODUCER_PIN = commonConfigurations.getStringProperty("kafka.producer.pin");
- public static final int KAFKA_PRODUCER_SECURITY = commonConfigurations.getIntProperty("kafka.producer.security");
- public static final String TOOLS_PRODUCER_LIBRARY = commonConfigurations.getStringProperty("tools.producer.library");
-
-
-
-}
diff --git a/src/main/java/com/galaxy/tsg/function/FlatMapFunction.java b/src/main/java/com/galaxy/tsg/function/FlatMapFunction.java
new file mode 100644
index 0000000..6cfad66
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/function/FlatMapFunction.java
@@ -0,0 +1,89 @@
+package com.galaxy.tsg.function;
+
+import com.galaxy.tsg.pojo.SessionEntity;
+import com.galaxy.tsg.pojo.TransformEntity;
+import com.geedgenetworks.utils.FormatUtils;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlatMapFunction implements org.apache.flink.api.common.functions.FlatMapFunction<SessionEntity, TransformEntity> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlatMapFunction.class);
+
+
+ @Override
+ public void flatMap(SessionEntity sessionEntity, Collector<TransformEntity> out) throws Exception {
+ TransformEntity transformEntity = new TransformEntity();
+ try {
+ transformEntity.setServer_ip(sessionEntity.getServer_ip());
+ transformEntity.setClient_ip(sessionEntity.getClient_ip());
+ transformEntity.setSubscriber_id(sessionEntity.getSubscriber_id());
+ transformEntity.setFqdn(sessionEntity.getServer_fqdn());
+
+ if(sessionEntity.getHttp_host()!=null && !sessionEntity.getHttp_host().isEmpty()){
+ transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getHttp_host()));
+ }
+ else if(sessionEntity.getSsl_sni()!=null && !sessionEntity.getSsl_sni().isEmpty()) {
+ transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getSsl_sni()));
+ }
+ else if(sessionEntity.getDtls_sni()!=null && !sessionEntity.getDtls_sni().isEmpty()){
+ transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getDtls_sni()));
+ }
+ if(sessionEntity.getQuic_sni()!=null && !sessionEntity.getQuic_sni().isEmpty()){
+ transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getQuic_sni()));
+ }
+ if(transformEntity.getDomain()!=null && transformEntity.getDomain().contains("InternetDomain")){
+ System.out.println(transformEntity.getDomain());
+ }
+ transformEntity.setDevice_group(sessionEntity.getDevice_group());
+ transformEntity.setDevice_id(sessionEntity.getDevice_id());
+ transformEntity.setData_center(sessionEntity.getData_center());
+ transformEntity.setVsys_id(sessionEntity.getVsys_id());
+ transformEntity.setTimestamp(sessionEntity.getRecv_time()/1000);
+ transformEntity.setSessions(1);
+//
+
+
+ if ((8L & sessionEntity.getFlags()) == 8L) {
+
+ transformEntity.setOut_bytes(sessionEntity.getSent_bytes());
+ transformEntity.setOut_pkts(sessionEntity.getSent_pkts());
+ transformEntity.setIn_bytes(sessionEntity.getReceived_bytes());
+ transformEntity.setIn_pkts(sessionEntity.getReceived_pkts());
+
+ } else {
+ transformEntity.setOut_bytes(sessionEntity.getReceived_bytes());
+ transformEntity.setOut_pkts(sessionEntity.getReceived_pkts());
+ transformEntity.setIn_bytes(sessionEntity.getSent_bytes());
+ transformEntity.setIn_pkts(sessionEntity.getSent_pkts());
+
+
+ }
+ if(sessionEntity.getFlags()>0) {
+
+ if ((8L & sessionEntity.getFlags()) == 8L && (16L & sessionEntity.getFlags()) == 16L) {
+ transformEntity.setInternal_ip(sessionEntity.getServer_ip());
+ out.collect(transformEntity);
+ transformEntity.setInternal_ip(sessionEntity.getClient_ip());
+
+ } else if ((8L & sessionEntity.getFlags()) == 8L && (16L & sessionEntity.getFlags()) != 16L) {
+ transformEntity.setInternal_ip(sessionEntity.getClient_ip());
+ transformEntity.setExternal_ip(sessionEntity.getServer_ip());
+ } else if ((8L & sessionEntity.getFlags()) != 8L && (16L & sessionEntity.getFlags()) == 16L) {
+ transformEntity.setInternal_ip(sessionEntity.getServer_ip());
+ transformEntity.setExternal_ip(sessionEntity.getClient_ip());
+ } else if ((8L & sessionEntity.getFlags()) != 8L && (16L & sessionEntity.getFlags()) != 16L) {
+ transformEntity.setExternal_ip(sessionEntity.getServer_ip());
+ out.collect(transformEntity);
+ transformEntity.setExternal_ip(sessionEntity.getClient_ip());
+
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Entity Parsing ERROR");
+ transformEntity.setIfError(1);
+ }
+ out.collect(transformEntity);
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/function/metricsAggregationReduce.java b/src/main/java/com/galaxy/tsg/function/MetricsAggregationReduce.java
index 63eb9b6..3fb1b47 100644
--- a/src/main/java/com/galaxy/tsg/function/metricsAggregationReduce.java
+++ b/src/main/java/com/galaxy/tsg/function/MetricsAggregationReduce.java
@@ -1,12 +1,12 @@
package com.galaxy.tsg.function;
-import com.galaxy.tsg.pojo.transformEntity;
+import com.galaxy.tsg.pojo.TransformEntity;
import org.apache.flink.api.common.functions.ReduceFunction;
-public class metricsAggregationReduce implements ReduceFunction<transformEntity> {
+public class MetricsAggregationReduce implements ReduceFunction<TransformEntity> {
@Override
- public transformEntity reduce(transformEntity value1, transformEntity value2) throws Exception {
+ public TransformEntity reduce(TransformEntity value1, TransformEntity value2) throws Exception {
value1.setOut_pkts(value1.getOut_pkts() + value2.getOut_pkts());
value1.setOut_bytes(value1.getOut_bytes() + value2.getOut_bytes());
value1.setIn_bytes(value1.getIn_bytes() + value2.getIn_bytes());
diff --git a/src/main/java/com/galaxy/tsg/function/metricsCalculate.java b/src/main/java/com/galaxy/tsg/function/MetricsCalculate.java
index 67d7264..60d2c36 100644
--- a/src/main/java/com/galaxy/tsg/function/metricsCalculate.java
+++ b/src/main/java/com/galaxy/tsg/function/MetricsCalculate.java
@@ -6,9 +6,9 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
-public class metricsCalculate extends ProcessWindowFunction<
- transformEntity, // 输入类型
- resultEntity, // 输出类型
+public class MetricsCalculate extends ProcessWindowFunction<
+ TransformEntity, // 输入类型
+ ResultEntity, // 输出类型
Tuple5<String, Long, String, String, String>, // 键类型
TimeWindow> { // 窗口类型
@@ -16,25 +16,25 @@ public class metricsCalculate extends ProcessWindowFunction<
@Override
public void process(Tuple5<String, Long, String, String, String> s,
Context context,
- Iterable<transformEntity> elements, Collector<resultEntity> out) throws Exception {
+ Iterable<TransformEntity> elements, Collector<ResultEntity> out) throws Exception {
if (elements.iterator().hasNext()) {
- transformEntity objectTransformEntity = elements.iterator().next();
- resultEntity enSession = new resultEntity();
+ TransformEntity objectTransformEntity = elements.iterator().next();
+ ResultEntity enSession = new ResultEntity();
enSession.setOrder_by("sessions");
enSession.setStat_time(context.window().getStart());
enSession.setSessionResultEntity(enrichessionResult(context.window().getStart(), objectTransformEntity));
out.collect(enSession);
- resultEntity enPacket = new resultEntity();
+ ResultEntity enPacket = new ResultEntity();
enPacket.setOrder_by("packets");
enPacket.setStat_time(context.window().getStart());
enPacket.setPacketResultEntity(enrichPacketResult(context.window().getStart() , objectTransformEntity));
out.collect(enPacket);
- resultEntity enbyte = new resultEntity();
+ ResultEntity enbyte = new ResultEntity();
enbyte.setOrder_by("bytes");
enbyte.setStat_time(context.window().getStart());
enbyte.setByteResultEntity(enrichByteResult(context.window().getStart(), objectTransformEntity));
@@ -46,8 +46,8 @@ public class metricsCalculate extends ProcessWindowFunction<
}
- public byteResultEntity enrichByteResult(Long time, transformEntity objectTransformEntity) {
- byteResultEntity en = new byteResultEntity();
+ public ByteResultEntity enrichByteResult(Long time, TransformEntity objectTransformEntity) {
+ ByteResultEntity en = new ByteResultEntity();
en.setVsys_id(objectTransformEntity.getVsys_id());
en.setTimestamp_ms(time);
en.setSessions(objectTransformEntity.getSessions());
@@ -72,9 +72,9 @@ public class metricsCalculate extends ProcessWindowFunction<
}
- public sessionResultEntity enrichessionResult(Long time, transformEntity objectTransformEntity) {
+ public SessionResultEntity enrichessionResult(Long time, TransformEntity objectTransformEntity) {
- sessionResultEntity en = new sessionResultEntity();
+ SessionResultEntity en = new SessionResultEntity();
en.setVsys_id(objectTransformEntity.getVsys_id());
en.setTimestamp_ms(time);
en.setSessions(objectTransformEntity.getSessions());
@@ -96,8 +96,8 @@ public class metricsCalculate extends ProcessWindowFunction<
return en;
}
- public packetResultEntity enrichPacketResult(Long time, transformEntity objectTransformEntity) {
- packetResultEntity en = new packetResultEntity();
+ public PacketResultEntity enrichPacketResult(Long time, TransformEntity objectTransformEntity) {
+ PacketResultEntity en = new PacketResultEntity();
en.setVsys_id(objectTransformEntity.getVsys_id());
en.setTimestamp_ms(time);
en.setSessions(objectTransformEntity.getSessions());
diff --git a/src/main/java/com/galaxy/tsg/function/topnHotItems.java b/src/main/java/com/galaxy/tsg/function/TopnHotItems.java
index c42b031..4736459 100644
--- a/src/main/java/com/galaxy/tsg/function/topnHotItems.java
+++ b/src/main/java/com/galaxy/tsg/function/TopnHotItems.java
@@ -11,14 +11,14 @@ import org.apache.flink.util.Collector;
import java.util.Map;
import java.util.PriorityQueue;
-public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEntity, String> {
+public class TopnHotItems extends KeyedProcessFunction<Tuple1<String>, ResultEntity, String> {
private final int topSize;
- private PriorityQueue<sessionResultEntity> sessionOrderEntity ;
- private PriorityQueue<packetResultEntity> packetOrderEntity ;
- private PriorityQueue<byteResultEntity> byteOrderEntity ;
+ private PriorityQueue<SessionResultEntity> sessionOrderEntity ;
+ private PriorityQueue<PacketResultEntity> packetOrderEntity ;
+ private PriorityQueue<ByteResultEntity> byteOrderEntity ;
- public topnHotItems(int i) {
+ public TopnHotItems(int i) {
this.topSize = i;
@@ -36,7 +36,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
@Override
- public void processElement(resultEntity objectEntity, Context context, Collector<String> collector) {
+ public void processElement(ResultEntity objectEntity, Context context, Collector<String> collector) {
@@ -49,7 +49,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
sessionOrderEntity.add(objectEntity.getSessionResultEntity());
} else {
if (sessionOrderEntity.peek() != null) {
- sessionResultEntity res=sessionOrderEntity.peek();
+ SessionResultEntity res=sessionOrderEntity.peek();
if (res.getSessions() <= objectEntity.getSessionResultEntity().getSessions()) {
sessionOrderEntity.poll();
sessionOrderEntity.add(objectEntity.getSessionResultEntity());
@@ -63,7 +63,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
packetOrderEntity.add(objectEntity.getPacketResultEntity());
} else {
if (packetOrderEntity.peek() != null) {
- packetResultEntity res=packetOrderEntity.peek();
+ PacketResultEntity res=packetOrderEntity.peek();
if ((res.getIn_pkts()+res.getOut_pkts()) <= (objectEntity.getPacketResultEntity().getIn_pkts()+objectEntity.getPacketResultEntity().getOut_pkts())) {
packetOrderEntity.poll();
packetOrderEntity.add(objectEntity.getPacketResultEntity());
@@ -76,7 +76,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
byteOrderEntity.add(objectEntity.getByteResultEntity());
} else {
if (byteOrderEntity.peek() != null) {
- byteResultEntity res=byteOrderEntity.peek();
+ ByteResultEntity res=byteOrderEntity.peek();
if ((res.getIn_bytes()+res.getOut_bytes()) <= (objectEntity.getByteResultEntity().getIn_bytes()+objectEntity.getByteResultEntity().getOut_bytes())) {
byteOrderEntity.poll();
byteOrderEntity.add(objectEntity.getByteResultEntity());
@@ -96,9 +96,9 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
- for(sessionResultEntity en : sessionOrderEntity) {
+ for(SessionResultEntity en : sessionOrderEntity) {
- metricsEntity metricsEntity = new metricsEntity();
+ MetricsEntity metricsEntity = new MetricsEntity();
metricsEntity.setName("sessions_top_" + en.getKey_by());
Map tags = new LinkedMap();
tags.put("vsys_id", en.getVsys_id());
@@ -145,10 +145,10 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
}
- for(packetResultEntity en : packetOrderEntity){
+ for(PacketResultEntity en : packetOrderEntity){
- metricsEntity metricsEntity = new metricsEntity();
+ MetricsEntity metricsEntity = new MetricsEntity();
metricsEntity.setName("packets_top_" + en.getKey_by());
Map tags = new LinkedMap();
tags.put("vsys_id", en.getVsys_id());
@@ -195,15 +195,9 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn
}
- for(byteResultEntity en : byteOrderEntity){
+ for(ByteResultEntity en : byteOrderEntity){
-
-
-
-
-
-
- metricsEntity metricsEntity = new metricsEntity();
+ MetricsEntity metricsEntity = new MetricsEntity();
metricsEntity.setName("bytes_top_" + en.getKey_by());
Map tags = new LinkedMap();
tags.put("vsys_id", en.getVsys_id());
diff --git a/src/main/java/com/galaxy/tsg/pojo/byteResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/ByteResultEntity.java
index 4b0849e..6771bc7 100644
--- a/src/main/java/com/galaxy/tsg/pojo/byteResultEntity.java
+++ b/src/main/java/com/galaxy/tsg/pojo/ByteResultEntity.java
@@ -1,6 +1,6 @@
package com.galaxy.tsg.pojo;
-public class byteResultEntity implements Comparable<byteResultEntity> {
+public class ByteResultEntity implements Comparable<ByteResultEntity> {
@@ -191,7 +191,7 @@ public class byteResultEntity implements Comparable<byteResultEntity> {
}
@Override
- public int compareTo(byteResultEntity per) {
+ public int compareTo(ByteResultEntity per) {
if((this.out_bytes+this.in_bytes)>=(per.out_bytes+per.in_bytes)){
return 1 ;
}else{
diff --git a/src/main/java/com/galaxy/tsg/pojo/metricsEntity.java b/src/main/java/com/galaxy/tsg/pojo/MetricsEntity.java
index 09fe562..c0e4b31 100644
--- a/src/main/java/com/galaxy/tsg/pojo/metricsEntity.java
+++ b/src/main/java/com/galaxy/tsg/pojo/MetricsEntity.java
@@ -2,7 +2,7 @@ package com.galaxy.tsg.pojo;
import java.util.Map;
-public class metricsEntity {
+public class MetricsEntity {
private String name;
private Map<String,String> tags;
private Map<String,Long> fields;
diff --git a/src/main/java/com/galaxy/tsg/pojo/packetResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/PacketResultEntity.java
index cc722b5..2bae32a 100644
--- a/src/main/java/com/galaxy/tsg/pojo/packetResultEntity.java
+++ b/src/main/java/com/galaxy/tsg/pojo/PacketResultEntity.java
@@ -1,6 +1,6 @@
package com.galaxy.tsg.pojo;
-public class packetResultEntity implements Comparable<packetResultEntity>, Cloneable {
+public class PacketResultEntity implements Comparable<PacketResultEntity>, Cloneable {
private long timestamp_ms;
@@ -189,7 +189,7 @@ public class packetResultEntity implements Comparable<packetResultEntity>, Clone
}
@Override
- public int compareTo(packetResultEntity per) {
+ public int compareTo(PacketResultEntity per) {
if((this.out_pkts+this.in_pkts)>=(per.out_pkts+per.in_pkts)){
return 1 ;
}else{
diff --git a/src/main/java/com/galaxy/tsg/pojo/ResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/ResultEntity.java
new file mode 100644
index 0000000..710fba4
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/pojo/ResultEntity.java
@@ -0,0 +1,56 @@
+package com.galaxy.tsg.pojo;
+
+public class ResultEntity {
+
+
+
+ private String order_by;
+ private Long stat_time;
+
+ private SessionResultEntity sessionResultEntity;
+ private PacketResultEntity packetResultEntity;
+ private ByteResultEntity byteResultEntity;
+
+
+ public String getOrder_by() {
+ return order_by;
+ }
+
+
+
+ public void setOrder_by(String order_by) {
+ this.order_by = order_by;
+ }
+
+ public Long getStat_time() {
+ return stat_time;
+ }
+
+ public void setStat_time(Long stat_time) {
+ this.stat_time = stat_time;
+ }
+
+ public SessionResultEntity getSessionResultEntity() {
+ return sessionResultEntity;
+ }
+
+ public void setSessionResultEntity(SessionResultEntity sessionResultEntity) {
+ this.sessionResultEntity = sessionResultEntity;
+ }
+
+ public PacketResultEntity getPacketResultEntity() {
+ return packetResultEntity;
+ }
+
+ public void setPacketResultEntity(PacketResultEntity packetResultEntity) {
+ this.packetResultEntity = packetResultEntity;
+ }
+
+ public ByteResultEntity getByteResultEntity() {
+ return byteResultEntity;
+ }
+
+ public void setByteResultEntity(ByteResultEntity byteResultEntity) {
+ this.byteResultEntity = byteResultEntity;
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/pojo/SessionEntity.java b/src/main/java/com/galaxy/tsg/pojo/SessionEntity.java
new file mode 100644
index 0000000..83bd02d
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/pojo/SessionEntity.java
@@ -0,0 +1,200 @@
+package com.galaxy.tsg.pojo;
+
+import java.io.Serializable;
+
+public class SessionEntity implements Serializable{
+
+ public String client_ip ;
+ public String app ;
+ public long recv_time ;
+ public String server_fqdn;
+ public long flags;
+ public String decoded_as ;
+ public String server_ip ;
+ public String http_host ;
+ public String ssl_sni ;
+ public String dtls_sni ;
+ public String quic_sni ;
+ public long vsys_id ;
+ public String device_group ;
+ public String device_id ;
+ public String data_center;
+
+ public String subscriber_id;
+ public long sent_pkts;
+ public long received_pkts;
+ public long sent_bytes ;
+ public long received_bytes;
+
+
+ public SessionEntity() {
+ }
+
+ public String getDevice_id() {
+ return device_id;
+ }
+
+ public void setDevice_id(String device_id) {
+ this.device_id = device_id;
+ }
+
+ public String getClient_ip() {
+ return client_ip;
+ }
+
+ public void setClient_ip(String client_ip) {
+ this.client_ip = client_ip;
+ }
+
+ public String getApp() {
+ return app;
+ }
+
+ public void setApp(String common_app_label) {
+ this.app = app;
+ }
+
+ public long getRecv_time() {
+ return recv_time;
+ }
+
+ public void setRecv_time(long recv_time) {
+ this.recv_time = recv_time;
+ }
+
+ public String getDecoded_as() {
+ return decoded_as;
+ }
+
+ public void setDecoded_as(String decoded_as) {
+ this.decoded_as = decoded_as;
+ }
+
+ public String getServer_ip() {
+ return server_ip;
+ }
+
+ public void setServer_ip(String server_ip) {
+ this.server_ip = server_ip;
+ }
+
+
+ public String getHttp_host() {
+ return http_host;
+ }
+
+ public void setHttp_host(String http_host) {
+ this.http_host = http_host;
+ }
+
+ public String getSsl_sni() {
+ return ssl_sni;
+ }
+
+ public void setSsl_sni(String ssl_sni) {
+ this.ssl_sni = ssl_sni;
+ }
+
+ public String getDtls_sni() {
+ return dtls_sni;
+ }
+
+ public void setDtls_sni(String dtls_sni) {
+ this.dtls_sni = dtls_sni;
+ }
+
+ public String getQuic_sni() {
+ return quic_sni;
+ }
+
+ public void setQuic_sni(String quic_sni) {
+ this.quic_sni = quic_sni;
+ }
+
+ public long getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(long common_vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+
+
+ public String getSubscriber_id() {
+ return subscriber_id;
+ }
+
+ public void setSubscriber_id(String subscriber_id) {
+ this.subscriber_id = subscriber_id;
+ }
+
+
+ public long getSent_pkts() {
+ return sent_pkts;
+ }
+
+ public void setSent_pkts(long sent_pkts) {
+ this.sent_pkts = sent_pkts;
+ }
+
+ public long getReceived_pkts() {
+ return received_pkts;
+ }
+
+ public void setReceived_pkts(long received_pkts) {
+ this.received_pkts = received_pkts;
+ }
+
+ public long getSent_bytes() {
+ return sent_bytes;
+ }
+
+ public void setSent_bytes(long sent_bytes) {
+ this.sent_bytes = sent_bytes;
+ }
+
+ public long getReceived_bytes() {
+ return received_bytes;
+ }
+
+ public void setReceived_bytes(long received_bytes) {
+ this.received_bytes = received_bytes;
+ }
+
+
+ public String getServer_fqdn() {
+ return server_fqdn;
+ }
+
+ public void setServer_fqdn(String server_fqdn) {
+ this.server_fqdn = server_fqdn;
+ }
+
+ public long getFlags() {
+ return flags;
+ }
+
+ public void setFlags(long flags) {
+ this.flags = flags;
+ }
+
+
+}
diff --git a/src/main/java/com/galaxy/tsg/pojo/sessionResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/SessionResultEntity.java
index 63a3ec8..e368a96 100644
--- a/src/main/java/com/galaxy/tsg/pojo/sessionResultEntity.java
+++ b/src/main/java/com/galaxy/tsg/pojo/SessionResultEntity.java
@@ -1,6 +1,6 @@
package com.galaxy.tsg.pojo;
-public class sessionResultEntity implements Comparable<sessionResultEntity> {
+public class SessionResultEntity implements Comparable<SessionResultEntity> {
@@ -189,7 +189,7 @@ public class sessionResultEntity implements Comparable<sessionResultEntity> {
}
@Override
- public int compareTo(sessionResultEntity per) {
+ public int compareTo(SessionResultEntity per) {
if(this.sessions>=per.sessions){
return 1 ;
}else{
diff --git a/src/main/java/com/galaxy/tsg/pojo/transformEntity.java b/src/main/java/com/galaxy/tsg/pojo/TransformEntity.java
index 4759e63..f1dcb5e 100644
--- a/src/main/java/com/galaxy/tsg/pojo/transformEntity.java
+++ b/src/main/java/com/galaxy/tsg/pojo/TransformEntity.java
@@ -2,32 +2,32 @@ package com.galaxy.tsg.pojo;
import java.io.Serializable;
-public class transformEntity implements Serializable {
-
- public int ifError;
- public long timestamp;
-
-
- public String fqdn;
- public String server_ip ;
- public String domain ;
- public long vsys_id ;
- public String device_group ;
- public String client_ip ;
- public String device_id;
- public String data_center;
- public String internal_ip;
- public String external_ip;
- public String subscriber_id;
- public long sessions;
- public long out_bytes;
- public long in_bytes;
- public long out_pkts ;
- public long in_pkts ;
- public String key_by;
- public String l4_protocol;
-
- public transformEntity() {
+public class TransformEntity implements Serializable,Cloneable {
+
+ private int ifError;
+ private long timestamp;
+
+
+ private String fqdn;
+ private String server_ip ;
+ private String domain ;
+ private long vsys_id ;
+ private String device_group ;
+ private String client_ip ;
+ private String device_id;
+ private String data_center;
+ private String internal_ip;
+ private String external_ip;
+ private String subscriber_id;
+ private long sessions;
+ private long out_bytes;
+ private long in_bytes;
+ private long out_pkts ;
+ private long in_pkts ;
+ private String key_by;
+ private String l4_protocol;
+
+ public TransformEntity() {
}
public String getL4_protocol() {
@@ -191,4 +191,9 @@ public class transformEntity implements Serializable {
public void setKey_by(String key_by) {
this.key_by = key_by;
}
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
}
diff --git a/src/main/java/com/galaxy/tsg/pojo/resultEntity.java b/src/main/java/com/galaxy/tsg/pojo/resultEntity.java
deleted file mode 100644
index 0593d8e..0000000
--- a/src/main/java/com/galaxy/tsg/pojo/resultEntity.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package com.galaxy.tsg.pojo;
-
-public class resultEntity {
-
-
-
- private String order_by;
- private Long stat_time;
-
- private com.galaxy.tsg.pojo.sessionResultEntity sessionResultEntity;
- private com.galaxy.tsg.pojo.packetResultEntity packetResultEntity;
- private com.galaxy.tsg.pojo.byteResultEntity byteResultEntity;
-
-
- public String getOrder_by() {
- return order_by;
- }
-
-
-
- public void setOrder_by(String order_by) {
- this.order_by = order_by;
- }
-
- public Long getStat_time() {
- return stat_time;
- }
-
- public void setStat_time(Long stat_time) {
- this.stat_time = stat_time;
- }
-
- public com.galaxy.tsg.pojo.sessionResultEntity getSessionResultEntity() {
- return sessionResultEntity;
- }
-
- public void setSessionResultEntity(com.galaxy.tsg.pojo.sessionResultEntity sessionResultEntity) {
- this.sessionResultEntity = sessionResultEntity;
- }
-
- public com.galaxy.tsg.pojo.packetResultEntity getPacketResultEntity() {
- return packetResultEntity;
- }
-
- public void setPacketResultEntity(com.galaxy.tsg.pojo.packetResultEntity packetResultEntity) {
- this.packetResultEntity = packetResultEntity;
- }
-
- public com.galaxy.tsg.pojo.byteResultEntity getByteResultEntity() {
- return byteResultEntity;
- }
-
- public void setByteResultEntity(com.galaxy.tsg.pojo.byteResultEntity byteResultEntity) {
- this.byteResultEntity = byteResultEntity;
- }
-}
diff --git a/src/main/java/com/galaxy/tsg/pojo/sessionEntity.java b/src/main/java/com/galaxy/tsg/pojo/sessionEntity.java
deleted file mode 100644
index 73e2c29..0000000
--- a/src/main/java/com/galaxy/tsg/pojo/sessionEntity.java
+++ /dev/null
@@ -1,204 +0,0 @@
-package com.galaxy.tsg.pojo;
-
-import java.io.Serializable;
-
-public class sessionEntity implements Serializable {
-
- public String common_client_ip ;
- public String common_app_label ;
- public long common_recv_time ;
- public String common_server_fqdn;
- public long common_flags;
- public String common_schema_type ;
- public String common_server_ip ;
- public String http_domain ;
- public long common_vsys_id ;
- public String common_device_group ;
- public String common_device_id ;
- public String common_data_center;
- public String common_l4_protocol;
- public String common_internal_ip;
- public String common_external_ip;
- public String common_subscriber_id;
- public long common_sessions;
- public long common_c2s_pkt_num;
- public long common_s2c_pkt_num;
- public long common_c2s_byte_num ;
- public long common_s2c_byte_num ;
-
-
- public sessionEntity() {
- }
-
- public String getCommon_device_id() {
- return common_device_id;
- }
-
- public void setCommon_device_id(String common_device_id) {
- this.common_device_id = common_device_id;
- }
-
- public String getCommon_client_ip() {
- return common_client_ip;
- }
-
- public void setCommon_client_ip(String common_client_ip) {
- this.common_client_ip = common_client_ip;
- }
-
- public String getCommon_app_label() {
- return common_app_label;
- }
-
- public void setCommon_app_label(String common_app_label) {
- this.common_app_label = common_app_label;
- }
-
- public long getCommon_recv_time() {
- return common_recv_time;
- }
-
- public void setCommon_recv_time(long common_recv_time) {
- this.common_recv_time = common_recv_time;
- }
-
- public String getCommon_schema_type() {
- return common_schema_type;
- }
-
- public void setCommon_schema_type(String common_schema_type) {
- this.common_schema_type = common_schema_type;
- }
-
- public String getCommon_server_ip() {
- return common_server_ip;
- }
-
- public void setCommon_server_ip(String common_server_ip) {
- this.common_server_ip = common_server_ip;
- }
-
-
-
- public String getHttp_domain() {
- return http_domain;
- }
-
- public void setHttp_domain(String http_domain) {
- this.http_domain = http_domain;
- }
-
- public long getCommon_vsys_id() {
- return common_vsys_id;
- }
-
- public void setCommon_vsys_id(long common_vsys_id) {
- this.common_vsys_id = common_vsys_id;
- }
-
- public String getCommon_device_group() {
- return common_device_group;
- }
-
- public void setCommon_device_group(String common_device_group) {
- this.common_device_group = common_device_group;
- }
-
-
- public String getCommon_data_center() {
- return common_data_center;
- }
-
- public void setCommon_data_center(String common_data_center) {
- this.common_data_center = common_data_center;
- }
-
- public String getCommon_l4_protocol() {
- return common_l4_protocol;
- }
-
- public void setCommon_l4_protocol(String common_l4_protocol) {
- this.common_l4_protocol = common_l4_protocol;
- }
-
- public String getCommon_internal_ip() {
- return common_internal_ip;
- }
-
- public void setCommon_internal_ip(String common_internal_ip) {
- this.common_internal_ip = common_internal_ip;
- }
-
- public String getCommon_external_ip() {
- return common_external_ip;
- }
-
- public void setCommon_external_ip(String common_external_ip) {
- this.common_external_ip = common_external_ip;
- }
-
- public String getCommon_subscriber_id() {
- return common_subscriber_id;
- }
-
- public void setCommon_subscriber_id(String common_subscriber_id) {
- this.common_subscriber_id = common_subscriber_id;
- }
-
- public long getCommon_sessions() {
- return common_sessions;
- }
-
- public void setCommon_sessions(long common_sessions) {
- this.common_sessions = common_sessions;
- }
-
- public long getCommon_c2s_pkt_num() {
- return common_c2s_pkt_num;
- }
-
- public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) {
- this.common_c2s_pkt_num = common_c2s_pkt_num;
- }
-
- public long getCommon_s2c_pkt_num() {
- return common_s2c_pkt_num;
- }
-
- public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) {
- this.common_s2c_pkt_num = common_s2c_pkt_num;
- }
-
- public long getCommon_c2s_byte_num() {
- return common_c2s_byte_num;
- }
-
- public void setCommon_c2s_byte_num(long common_c2s_byte_num) {
- this.common_c2s_byte_num = common_c2s_byte_num;
- }
-
- public long getCommon_s2c_byte_num() {
- return common_s2c_byte_num;
- }
-
- public void setCommon_s2c_byte_num(long common_s2c_byte_num) {
- this.common_s2c_byte_num = common_s2c_byte_num;
- }
-
-
- public String getCommon_server_fqdn() {
- return common_server_fqdn;
- }
-
- public void setCommon_server_fqdn(String common_server_fqdn) {
- this.common_server_fqdn = common_server_fqdn;
- }
-
- public long getCommon_flags() {
- return common_flags;
- }
-
- public void setCommon_flags(long common_flags) {
- this.common_flags = common_flags;
- }
-}
diff --git a/src/main/java/com/galaxy/tsg/selector/GroupBySelector.java b/src/main/java/com/galaxy/tsg/selector/GroupBySelector.java
new file mode 100644
index 0000000..7c5f727
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/selector/GroupBySelector.java
@@ -0,0 +1,48 @@
+package com.galaxy.tsg.selector;
+
+import com.galaxy.tsg.pojo.TransformEntity;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple5;
+
+public class GroupBySelector implements KeySelector<TransformEntity, Tuple5<String, Long, String, String, String>> {
+
+ public String key;
+
+ public GroupBySelector(String key) {
+ this.key = key;
+ }
+
+ @Override
+ public Tuple5<String, Long, String, String, String> getKey(TransformEntity transformEntity) throws Exception {
+
+ Tuple5<String, Long, String, String, String> tuple = null;
+ transformEntity.setKey_by(key);
+ switch (key) {
+ case "client_ip":
+ tuple = new Tuple5<>(transformEntity.getClient_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
+ break;
+ case "server_ip":
+ tuple = new Tuple5<>(transformEntity.getServer_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
+ break;
+ case "internal_ip":
+ tuple = new Tuple5<>(transformEntity.getInternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
+ break;
+ case "external_ip":
+ tuple = new Tuple5<>(transformEntity.getExternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
+ break;
+ case "server_domain":
+ tuple = new Tuple5<>(transformEntity.getDomain(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
+ break;
+ case "subscriber_id":
+ tuple = new Tuple5<>(transformEntity.getSubscriber_id(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
+ break;
+ case "server_fqdn":
+ tuple = new Tuple5<>(transformEntity.getFqdn(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id());
+ break;
+
+ default:
+
+ }
+ return tuple;
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/selector/OrderBySelector.java b/src/main/java/com/galaxy/tsg/selector/OrderBySelector.java
new file mode 100644
index 0000000..cf3b23a
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/selector/OrderBySelector.java
@@ -0,0 +1,13 @@
+package com.galaxy.tsg.selector;
+
+import com.galaxy.tsg.pojo.ResultEntity;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+
+public class OrderBySelector implements KeySelector<ResultEntity, Tuple1<String>> {
+
+ @Override
+ public Tuple1<String> getKey(ResultEntity entity) throws Exception {
+ return new Tuple1<>(entity.getOrder_by());
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/util/KafkaUtils.java b/src/main/java/com/galaxy/tsg/util/KafkaUtils.java
new file mode 100644
index 0000000..9aae43f
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/util/KafkaUtils.java
@@ -0,0 +1,114 @@
+package com.galaxy.tsg.util;
+
+import com.galaxy.tsg.config.CommonConfig;
+import com.galaxy.tsg.pojo.SessionEntity;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static com.galaxy.tsg.config.CommonConfig.KAFKA_CONSUMER_TOPIC;
+import static com.galaxy.tsg.config.CommonConfig.KAFKA_PRODUCER_TOPIC;
+
+public class KafkaUtils {
+
+
+ public static Properties getKafkaSourceProperty(Configuration configuration) {
+ Properties properties = new Properties();
+ properties.setProperty("group.id", configuration.getString(CommonConfig.KAFKA_CONSUMER_GROUP_ID));
+ properties.setProperty("bootstrap.servers",configuration.getString(CommonConfig.KAFKA_CONSUMER_BROKER));
+ properties.setProperty("session.timeout.ms", configuration.getString(CommonConfig.KAFKA_CONSUMER_SESSION_TIMEOUT_MS));
+ properties.setProperty("max.poll.records", configuration.getString(CommonConfig.KAFKA_CONSUMER_MAX_POLL_RECORD));
+ properties.setProperty("max.partition.fetch.bytes", configuration.getString(CommonConfig.KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES));
+
+
+ switch (configuration.getInteger(CommonConfig.KAFKA_CONSUMER_SECURITY)) {
+ case 1:
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", configuration.getString(CommonConfig.TOOLS_CONSUMER_LIBRARY) + "keystore.jks");
+ properties.put("ssl.keystore.password", configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN));
+ properties.put("ssl.truststore.location", configuration.getString(CommonConfig.TOOLS_CONSUMER_LIBRARY) + "truststore.jks");
+ properties.put("ssl.truststore.password", configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN));
+ properties.put("ssl.key.password", configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN));
+ break;
+ case 2:
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + configuration.getString(CommonConfig.KAFKA_CONSUMER_USER) + " password=" + configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN) + ";");
+ break;
+ default:
+ }
+
+ return properties;
+ }
+ private static Properties getKafkaSinkProperty(Configuration configuration) {
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers",configuration.getString(CommonConfig.KAFKA_PRODUCER_BROKER));
+
+ properties.put("acks", "1");
+ properties.put("retries", configuration.getString(CommonConfig.KAFKA_PRODUCER_RETRIES));
+ properties.put("linger.ms", configuration.getString(CommonConfig.KAFKA_PRODUCER_LINGER_MS));
+ properties.put("request.timeout.ms", configuration.getString(CommonConfig.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS));
+ properties.put("batch.size", configuration.getString(CommonConfig.KAFKA_PRODUCER_BATCH_SIZE));
+ properties.put("buffer.memory", configuration.getString(CommonConfig.KAFKA_PRODUCER_BUFFER_MEMORY));
+ properties.put("max.request.size", configuration.getString(CommonConfig.KAFKA_PRODUCER_MAX_REQUEST_SIZE));
+ properties.put("compression.type", configuration.getString(CommonConfig.KAFKA_PRODUCER_COMPRESSION_TYPE));
+ switch (configuration.getInteger(CommonConfig.KAFKA_PRODUCER_SECURITY)) {
+ case 1:
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", configuration.getString(CommonConfig.TOOLS_PRODUCER_LIBRARY) + "keystore.jks");
+ properties.put("ssl.keystore.password", configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN));
+ properties.put("ssl.truststore.location", configuration.getString(CommonConfig.TOOLS_PRODUCER_LIBRARY) + "truststore.jks");
+ properties.put("ssl.truststore.password", configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN));
+ properties.put("ssl.key.password", configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN));
+ break;
+ case 2:
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + configuration.getString(CommonConfig.KAFKA_PRODUCER_USER) + " password=" + configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN) + ";");
+ break;
+ default:
+ }
+ return properties;
+ }
+
+ public static FlinkKafkaConsumer<SessionEntity> getKafkaConsumer(Configuration configuration) {
+ FlinkKafkaConsumer<SessionEntity> kafkaConsumer = new FlinkKafkaConsumer<>(configuration.getString(KAFKA_CONSUMER_TOPIC),
+ new TimestampDeserializationSchema(), getKafkaSourceProperty(configuration));
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+ /* public static FlinkKafkaConsumer<String> getKafkaConsumerLists(List<String> topic) {
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
+ new SimpleStringSchema(), getKafkaSourceProperty(configuration));
+
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }*/
+ public static SinkFunction<String> getKafkaSink(Configuration configuration) {
+ FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<String>(
+ configuration.getString(KAFKA_PRODUCER_TOPIC),
+ new SimpleStringSchema(),
+ getKafkaSinkProperty(configuration),
+ Optional.empty()
+ );
+ flinkKafkaProducer.setLogFailuresOnly(false);
+ return flinkKafkaProducer;
+ }
+
+
+}
diff --git a/src/main/java/com/galaxy/tsg/util/TimestampDeserializationSchema.java b/src/main/java/com/galaxy/tsg/util/TimestampDeserializationSchema.java
new file mode 100644
index 0000000..c811094
--- /dev/null
+++ b/src/main/java/com/galaxy/tsg/util/TimestampDeserializationSchema.java
@@ -0,0 +1,48 @@
+package com.galaxy.tsg.util;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSON;
+import com.galaxy.tsg.pojo.SessionEntity;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * @author qidaijie
+ * @version 2022/3/89:42
+ */
+public class TimestampDeserializationSchema implements KafkaDeserializationSchema {
+ private static final Log logger = LogFactory.get();
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(new TypeHint<SessionEntity>() {});
+ }
+
+ @Override
+ public boolean isEndOfStream(Object nextElement) {
+ return false;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public SessionEntity deserialize(ConsumerRecord record) throws Exception {
+ if (record != null) {
+ try {
+
+
+ SessionEntity sessionEntity = JSON.parseObject((byte[]) record.value(), SessionEntity.class);
+ sessionEntity.setRecv_time(record.timestamp());
+
+ return sessionEntity;
+ } catch (RuntimeException e) {
+ logger.error(
+ "KafkaConsumer Deserialize failed,The exception is : " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ return new SessionEntity();
+ }
+}
diff --git a/src/main/java/com/galaxy/tsg/util/kafkaUtils.java b/src/main/java/com/galaxy/tsg/util/kafkaUtils.java
deleted file mode 100644
index 5374b8b..0000000
--- a/src/main/java/com/galaxy/tsg/util/kafkaUtils.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package com.galaxy.tsg.util;
-
-import com.galaxy.tsg.config.commonConfig;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.kafka.common.config.SslConfigs;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.Properties;
-
-public class kafkaUtils {
-
-
- public static Properties getKafkaSourceProperty() {
- Properties properties = new Properties();
- properties.setProperty("group.id", commonConfig.KAFKA_CONSUMER_GROUP_ID);
- properties.setProperty("bootstrap.servers", commonConfig.KAFKA_CONSUMER_BROKER);
- properties.setProperty("session.timeout.ms", commonConfig.KAFKA_CONSUMER_SESSION_TIMEOUT_MS);
- properties.setProperty("max.poll.records", commonConfig.KAFKA_CONSUMER_MAX_POLL_RECORD);
- properties.setProperty("max.partition.fetch.bytes", commonConfig.KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES);
-
- switch (commonConfig.KAFKA_CONSUMER_SECURITY) {
- case 1:
- properties.put("security.protocol", "SSL");
- properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- properties.put("ssl.keystore.location", commonConfig.TOOLS_CONSUMER_LIBRARY + "keystore.jks");
- properties.put("ssl.keystore.password", commonConfig.KAFKA_CONSUMER_PIN);
- properties.put("ssl.truststore.location", commonConfig.TOOLS_CONSUMER_LIBRARY + "truststore.jks");
- properties.put("ssl.truststore.password", commonConfig.KAFKA_CONSUMER_PIN);
- properties.put("ssl.key.password", commonConfig.KAFKA_CONSUMER_PIN);
- break;
- case 2:
- properties.put("security.protocol", "SASL_PLAINTEXT");
- properties.put("sasl.mechanism", "PLAIN");
- properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
- + commonConfig.KAFKA_CONSUMER_USER + " password=" + commonConfig.KAFKA_CONSUMER_PIN + ";");
- break;
- default:
- }
-
- return properties;
- }
-
- private static Properties getKafkaSinkProperty() {
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", commonConfig.KAFKA_PRODUCER_BROKER);
-
- properties.put("acks", "1");
- properties.put("retries", commonConfig.KAFKA_PRODUCER_RETRIES);
- properties.put("linger.ms", commonConfig.KAFKA_PRODUCER_LINGER_MS);
- properties.put("request.timeout.ms", commonConfig.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS);
- properties.put("batch.size", commonConfig.KAFKA_PRODUCER_BATCH_SIZE);
- properties.put("buffer.memory", commonConfig.KAFKA_PRODUCER_BUFFER_MEMORY);
- properties.put("max.request.size", commonConfig.KAFKA_PRODUCER_MAX_REQUEST_SIZE);
- properties.put("compression.type", commonConfig.KAFKA_PRODUCER_COMPRESSION_TYPE);
-
- switch (commonConfig.KAFKA_PRODUCER_SECURITY) {
- case 1:
- properties.put("security.protocol", "SSL");
- properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- properties.put("ssl.keystore.location", commonConfig.TOOLS_PRODUCER_LIBRARY + "keystore.jks");
- properties.put("ssl.keystore.password", commonConfig.KAFKA_PRODUCER_PIN);
- properties.put("ssl.truststore.location", commonConfig.TOOLS_PRODUCER_LIBRARY + "truststore.jks");
- properties.put("ssl.truststore.password", commonConfig.KAFKA_PRODUCER_PIN);
- properties.put("ssl.key.password", commonConfig.KAFKA_PRODUCER_PIN);
- break;
- case 2:
- properties.put("security.protocol", "SASL_PLAINTEXT");
- properties.put("sasl.mechanism", "PLAIN");
- properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
- + commonConfig.KAFKA_PRODUCER_USER + " password=" + commonConfig.KAFKA_PRODUCER_PIN + ";");
- break;
- default:
- }
- return properties;
- }
-
- public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic) {
- FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
- new SimpleStringSchema(), getKafkaSourceProperty());
-
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
- kafkaConsumer.setStartFromGroupOffsets();
-
- return kafkaConsumer;
- }
- public static FlinkKafkaConsumer<String> getKafkaConsumerLists(List<String> topic) {
- FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
- new SimpleStringSchema(), getKafkaSourceProperty());
-
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
- kafkaConsumer.setStartFromGroupOffsets();
-
- return kafkaConsumer;
- }
- public static SinkFunction<String> getKafkaSink(String topic) {
- return new FlinkKafkaProducer<String>(
- topic,
- new SimpleStringSchema(),
- getKafkaSinkProperty(),
- Optional.empty()
- );
- }
-
-
-}