diff options
| author | wangkuan <[email protected]> | 2024-02-06 15:38:42 +0800 |
|---|---|---|
| committer | wangkuan <[email protected]> | 2024-02-06 15:38:42 +0800 |
| commit | 6d68355b5e1e412fbceed9af70310137b41fffc7 (patch) | |
| tree | aad2fa66d31f707e472a6f0541f5467c9d48aa26 | |
| parent | 6e683191c209855924e938b9b9dd5475458c0b7f (diff) | |
26 files changed, 932 insertions, 770 deletions
@@ -6,7 +6,7 @@ <groupId>com.galaxy.tsg</groupId> <artifactId>topn-metrics-job</artifactId> - <version>23-11-01</version> + <version>24-01-16</version> <repositories> <repository> @@ -21,11 +21,13 @@ <hadoop.version>2.7.1</hadoop.version> </properties> - <dependencies> + +<dependencies> + <dependency> - <groupId>com.zdjizhi</groupId> + <groupId>com.geedgenetworks</groupId> <artifactId>galaxy</artifactId> - <version>1.1.0</version> + <version>1.2.3</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> @@ -37,6 +39,7 @@ </exclusion> </exclusions> </dependency> + <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> @@ -149,6 +152,12 @@ <version>3.3.0</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.12</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -181,7 +190,7 @@ </goals> <configuration> - <finalName>topn-metrics-job-23-11-01</finalName> + <finalName>topn-metrics-job-24-01-16</finalName> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. diff --git a/src/main/java/com/galaxy/tsg/Toptask.java b/src/main/java/com/galaxy/tsg/Toptask.java index a9cb6d2..6926c3c 100644 --- a/src/main/java/com/galaxy/tsg/Toptask.java +++ b/src/main/java/com/galaxy/tsg/Toptask.java @@ -1,19 +1,18 @@ package com.galaxy.tsg; -import com.alibaba.fastjson2.JSON; -import com.galaxy.tsg.function.metricsAggregationReduce; -import com.galaxy.tsg.function.metricsCalculate; -import com.galaxy.tsg.function.topnHotItems; -import com.galaxy.tsg.pojo.resultEntity; -import com.galaxy.tsg.pojo.sessionEntity; -import com.galaxy.tsg.pojo.transformEntity; -import com.zdjizhi.utils.StringUtil; +import com.galaxy.tsg.config.CommonConfig; +import com.galaxy.tsg.function.*; +import com.galaxy.tsg.pojo.ResultEntity; +import com.galaxy.tsg.pojo.SessionEntity; +import com.galaxy.tsg.pojo.TransformEntity; +import com.galaxy.tsg.selector.GroupBySelector; +import com.galaxy.tsg.selector.OrderBySelector; +import com.geedgenetworks.utils.StringUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -23,249 +22,125 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.Objects; -import static com.galaxy.tsg.config.commonConfig.*; -import static com.galaxy.tsg.util.kafkaUtils.getKafkaConsumer; -import static com.galaxy.tsg.util.kafkaUtils.getKafkaSink; +import static com.galaxy.tsg.config.CommonConfig.*; +import static com.galaxy.tsg.util.KafkaUtils.getKafkaConsumer; +import static com.galaxy.tsg.util.KafkaUtils.getKafkaSink; public class Toptask { private static final Logger LOG = LoggerFactory.getLogger(Toptask.class); public static void main(String[] args) throws Exception { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ParameterTool serviceConfig = ParameterTool.fromPropertiesFile(args[0]); + Configuration configurationService = serviceConfig.getConfiguration(); + // global register + env.getConfig().setGlobalJobParameters(configurationService); + WatermarkStrategy<SessionEntity> strategyForSession = WatermarkStrategy + .<SessionEntity>forBoundedOutOfOrderness(Duration.ofSeconds(configurationService.get(CommonConfig.WATERMARK_TIME))) + .withTimestampAssigner((sessionEntity, timestamp) -> sessionEntity.getRecv_time()); - DataStream<String> sourceForSession = env.addSource(getKafkaConsumer(KAFKA_CONSUMER_TOPIC)).setParallelism(KAFKA_CONSUMER_PARALLELISM); - WatermarkStrategy<transformEntity> strategyForSession = WatermarkStrategy - .<transformEntity>forBoundedOutOfOrderness(Duration.ofSeconds(WATERMARK_TIME)) - .withTimestampAssigner((transformEntity, timestamp) -> transformEntity.getTimestamp()*1000); - - - SingleOutputStreamOperator<transformEntity> inputForSession = sourceForSession.map(new MapFunction<String, transformEntity>() { + DataStream<SessionEntity> sourceForSession = env.addSource(getKafkaConsumer(configurationService)).setParallelism(configurationService.get(CommonConfig.KAFKA_CONSUMER_PARALLELISM)).assignTimestampsAndWatermarks(strategyForSession);; + SingleOutputStreamOperator<TransformEntity> inputForSession = sourceForSession.flatMap(new FlatMapFunction() + ).filter(new FilterFunction<TransformEntity>() { @Override - public transformEntity map(String message) { - transformEntity transformEntity = new transformEntity(); - try { - sessionEntity sessionEntity = JSON.parseObject(message, com.galaxy.tsg.pojo.sessionEntity.class); - transformEntity.setServer_ip(sessionEntity.getCommon_server_ip()); - transformEntity.setClient_ip(sessionEntity.getCommon_client_ip()); - - transformEntity.setSubscriber_id(sessionEntity.getCommon_subscriber_id()); - transformEntity.setFqdn(sessionEntity.getCommon_server_fqdn()); - transformEntity.setExternal_ip(sessionEntity.getCommon_external_ip()); - transformEntity.setInternal_ip(sessionEntity.getCommon_internal_ip()); - transformEntity.setDomain(sessionEntity.getHttp_domain()); - transformEntity.setDevice_group(sessionEntity.getCommon_device_group()); - transformEntity.setDevice_id(sessionEntity.getCommon_device_id()); - transformEntity.setData_center(sessionEntity.getCommon_data_center()); - transformEntity.setVsys_id(sessionEntity.getCommon_vsys_id()); - transformEntity.setTimestamp(sessionEntity.getCommon_recv_time()); - transformEntity.setSessions(sessionEntity.getCommon_sessions()); - - transformEntity.setL4_protocol(sessionEntity.getCommon_l4_protocol()); - - - if ((8L & sessionEntity.getCommon_flags()) == 8L) { - - transformEntity.setOut_bytes(sessionEntity.getCommon_c2s_byte_num()); - transformEntity.setOut_pkts(sessionEntity.getCommon_c2s_pkt_num()); - transformEntity.setIn_bytes(sessionEntity.getCommon_s2c_byte_num()); - transformEntity.setIn_pkts(sessionEntity.getCommon_s2c_pkt_num()); - - } else { - transformEntity.setOut_bytes(sessionEntity.getCommon_s2c_byte_num()); - transformEntity.setOut_pkts(sessionEntity.getCommon_s2c_pkt_num()); - transformEntity.setIn_bytes(sessionEntity.getCommon_c2s_byte_num()); - transformEntity.setIn_pkts(sessionEntity.getCommon_c2s_pkt_num()); - + public boolean filter(TransformEntity entity) throws Exception { - } - - - } catch (Exception e) { - LOG.error("Entity Parsing ERROR"); - transformEntity.setIfError(1); - } - return transformEntity; - } - }).filter(new FilterFunction<transformEntity>() { - @Override - public boolean filter(transformEntity entity) throws Exception { - - return entity.ifError != 1; + return entity.getIfError() != 1; } }); - - //clientip聚合TOP - - SingleOutputStreamOperator<transformEntity> clientipdStream = inputForSession.filter(new FilterFunction<transformEntity>() { - @Override - public boolean filter(transformEntity value) throws Exception { - return "IPv6_TCP".equals(value.getL4_protocol()) || "IPv4_TCP".equals(value.getL4_protocol()); - } - }).assignTimestampsAndWatermarks(strategyForSession); - - SingleOutputStreamOperator<resultEntity> windowedStream = clientipdStream.keyBy(new groupBySelector("client_ip")) - .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("client_ip");; - DataStream<String> Stream = windowedStream.keyBy(new oneKeySelector()) - .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM); - Stream.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM); - - - //serverip聚合TOP - - - SingleOutputStreamOperator<transformEntity> serveripdStream = inputForSession.filter(new FilterFunction<transformEntity>() { - @Override - public boolean filter(transformEntity value) throws Exception { - return "IPv6_TCP".equals(value.getL4_protocol()) || "IPv4_TCP".equals(value.getL4_protocol()); - } - }).assignTimestampsAndWatermarks(strategyForSession); - - SingleOutputStreamOperator<resultEntity> windowedStreamForServerIp = serveripdStream.keyBy(new groupBySelector("server_ip")) - .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_ip");; - DataStream<String> StreamForServerIp = windowedStreamForServerIp.keyBy(new oneKeySelector()) - .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM); - StreamForServerIp.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM); - - - //common_internal_ip聚合TOP - SingleOutputStreamOperator<transformEntity> internalStream = inputForSession.filter(new FilterFunction<transformEntity>() { - @Override - public boolean filter(transformEntity value) throws Exception { - return StringUtil.isNotEmpty(value.getInternal_ip()); - } - }).assignTimestampsAndWatermarks(strategyForSession); - - SingleOutputStreamOperator<resultEntity> windowedStreamForInternal = internalStream.keyBy(new groupBySelector("internal_ip")) - .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("internal_ip");; - DataStream<String> StreamForInternal = windowedStreamForInternal.keyBy(new oneKeySelector()) - .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM); - StreamForInternal.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM); - - //common_external_ip聚合TOP - - SingleOutputStreamOperator<transformEntity> externalStream = inputForSession.filter(new FilterFunction<transformEntity>() { - @Override - public boolean filter(transformEntity value) throws Exception { - return StringUtil.isNotEmpty(value.getExternal_ip()); - } - }).assignTimestampsAndWatermarks(strategyForSession); - - SingleOutputStreamOperator<resultEntity> windowedStreamForExternal = externalStream.keyBy(new groupBySelector("external_ip")) - .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("external_ip");; - DataStream<String> StreamForExternal = windowedStreamForExternal.keyBy(new oneKeySelector()) - .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM); - StreamForExternal.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM); - - //http_domain聚合TOP - - SingleOutputStreamOperator<transformEntity> domainStream = inputForSession.filter(new FilterFunction<transformEntity>() { - @Override - public boolean filter(transformEntity value) throws Exception { - return StringUtil.isNotEmpty(value.getDomain()); - } - }).assignTimestampsAndWatermarks(strategyForSession); - - SingleOutputStreamOperator<resultEntity> windowedStreamForDomain = domainStream.keyBy(new groupBySelector("server_domain")) - .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_domain");; - DataStream<String> StreamForDomain = windowedStreamForDomain.keyBy(new oneKeySelector()) - .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM); - StreamForDomain.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM); - - SingleOutputStreamOperator<transformEntity> userStream = inputForSession.filter(new FilterFunction<transformEntity>() { - @Override - public boolean filter(transformEntity value) throws Exception { - return StringUtil.isNotEmpty(value.getSubscriber_id()); - } - }).assignTimestampsAndWatermarks(strategyForSession); - - //common_subscriber_id聚合TOP - SingleOutputStreamOperator<resultEntity> windowedStreamForUser = userStream.keyBy(new groupBySelector("subscriber_id")) - .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("subscriber_id");; - DataStream<String> StreamForUser = windowedStreamForUser.keyBy(new oneKeySelector()) - .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM); - StreamForUser.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM); - - SingleOutputStreamOperator<transformEntity> fqdnStream = inputForSession.filter(new FilterFunction<transformEntity>() { - @Override - public boolean filter(transformEntity value) throws Exception { - return StringUtil.isNotEmpty(value.getFqdn()); - } - }).assignTimestampsAndWatermarks(strategyForSession); - - SingleOutputStreamOperator<resultEntity> windowedStreamForFqdn = fqdnStream.keyBy(new groupBySelector("server_fqdn")) - .window(TumblingEventTimeWindows.of(Time.minutes(WINDOW_TIME_MINUTE))) - .reduce(new metricsAggregationReduce(), new metricsCalculate()).setParallelism(TASK_PARALLELISM).name("server_fqdn"); - DataStream<String> StreamForFqdn = windowedStreamForFqdn.keyBy(new oneKeySelector()) - .process(new topnHotItems(TOP_LIMIT)).setParallelism(ORDERBY_PARALLELISM); - StreamForFqdn.addSink(getKafkaSink(KAFKA_PRODUCER_TOPIC)).setParallelism(SINK_PARALLELISM); - - env.execute(JOB_NAME); + buildJobTuple(configurationService,inputForSession,"client_ip"); + buildJobTuple(configurationService,inputForSession,"server_ip"); + buildJobTuple(configurationService,inputForSession,"internal_ip"); + buildJobTuple(configurationService,inputForSession,"external_ip"); + buildJobTuple(configurationService,inputForSession,"server_domain"); + buildJobTuple(configurationService,inputForSession,"subscriber_id"); + buildJobTuple(configurationService,inputForSession,"server_fqdn"); + env.execute(configurationService.getString(JOB_NAME)); } - public static class groupBySelector implements KeySelector<transformEntity, Tuple5<String, Long, String, String, String>> { - public String key; + private static void buildJobTuple(Configuration configurationService, SingleOutputStreamOperator<TransformEntity> inputForSession, String key) { - public groupBySelector(String key) { - this.key = key; - } - - @Override - public Tuple5<String, Long, String, String, String> getKey(transformEntity transformEntity) throws Exception { - - Tuple5<String, Long, String, String, String> tuple = null; - transformEntity.setKey_by(key); - switch (key) { - case "client_ip": - tuple = new Tuple5<>(transformEntity.getClient_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); - break; - case "server_ip": - tuple = new Tuple5<>(transformEntity.getServer_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); - break; - case "internal_ip": - tuple = new Tuple5<>(transformEntity.getInternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); - break; - case "external_ip": - tuple = new Tuple5<>(transformEntity.getExternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); - break; - case "server_domain": - tuple = new Tuple5<>(transformEntity.getDomain(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); - break; + SingleOutputStreamOperator<TransformEntity> event = null; + switch (key) { + case "client_ip": + event= inputForSession.filter(new FilterFunction<TransformEntity>() { + @Override + public boolean filter(TransformEntity value) throws Exception { + return StringUtil.isNotEmpty(value.getClient_ip()); + } + }); + break; + case "server_ip": + event= inputForSession.filter(new FilterFunction<TransformEntity>() { + @Override + public boolean filter(TransformEntity value) throws Exception { + return StringUtil.isNotEmpty(value.getServer_ip()); + } + }); + break; + case "internal_ip": + event= inputForSession.filter(new FilterFunction<TransformEntity>() { + @Override + public boolean filter(TransformEntity value) throws Exception { + return StringUtil.isNotEmpty(value.getInternal_ip()); + } + }); + break; + case "external_ip": + event= inputForSession.filter(new FilterFunction<TransformEntity>() { + @Override + public boolean filter(TransformEntity value) throws Exception { + return StringUtil.isNotEmpty(value.getExternal_ip()); + } + }); + break; + case "server_domain": + event= inputForSession.filter(new FilterFunction<TransformEntity>() { + @Override + public boolean filter(TransformEntity value) throws Exception { + return StringUtil.isNotEmpty(value.getDomain()); + } + }); + break; + case "subscriber_id": + event= inputForSession.filter(new FilterFunction<TransformEntity>() { + @Override + public boolean filter(TransformEntity value) throws Exception { + return StringUtil.isNotEmpty(value.getSubscriber_id()); + } + }); + break; + case "server_fqdn": + event= inputForSession.filter(new FilterFunction<TransformEntity>() { + @Override + public boolean filter(TransformEntity value) throws Exception { + return StringUtil.isNotEmpty(value.getFqdn()); + } + }); + break; - case "subscriber_id": - tuple = new Tuple5<>(transformEntity.getSubscriber_id(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); - break; - case "server_fqdn": - tuple = new Tuple5<>(transformEntity.getFqdn(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); - break; + default: + break; + } + SingleOutputStreamOperator<ResultEntity> windowedStreamForFqdn = Objects.requireNonNull(event).keyBy(new GroupBySelector(key)) + .window(TumblingEventTimeWindows.of(Time.minutes(configurationService.getInteger(CommonConfig.WINDOW_TIME_MINUTE)))) + .reduce(new MetricsAggregationReduce(), new MetricsCalculate()).setParallelism(configurationService.getInteger(CommonConfig.TASK_PARALLELISM)).name(key); + DataStream<String> StreamForKey = windowedStreamForFqdn.keyBy(new OrderBySelector()) + .process(new TopnHotItems(configurationService.getInteger(CommonConfig.TOP_LIMIT))).setParallelism(configurationService.getInteger(CommonConfig.ORDERBY_PARALLELISM)); + StreamForKey.addSink(getKafkaSink(configurationService)).setParallelism(configurationService.getInteger(CommonConfig.SINK_PARALLELISM)); - default: - } - return tuple; } - } - - public static class oneKeySelector implements KeySelector<resultEntity, Tuple1<String>> { - @Override - public Tuple1<String> getKey(resultEntity entity) throws Exception { - return new Tuple1<>(entity.getOrder_by()); - } - } }
\ No newline at end of file diff --git a/src/main/java/com/galaxy/tsg/config/CommonConfig.java b/src/main/java/com/galaxy/tsg/config/CommonConfig.java new file mode 100644 index 0000000..db1896a --- /dev/null +++ b/src/main/java/com/galaxy/tsg/config/CommonConfig.java @@ -0,0 +1,53 @@ +package com.galaxy.tsg.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** + * Created by wk on 2021/1/6. + */ +public class CommonConfig { + + public static final ConfigOption<String> KAFKA_CONSUMER_BROKER = ConfigOptions.key("kafka.consumer.broker").stringType().defaultValue(""); + public static final ConfigOption<String> KAFKA_CONSUMER_GROUP_ID = ConfigOptions.key("kafka.consumer.group.id").stringType().defaultValue(""); + public static final ConfigOption<String> KAFKA_CONSUMER_TOPIC = ConfigOptions.key("kafka.consumer.topic").stringType().defaultValue(""); + public static final ConfigOption<Integer> KAFKA_CONSUMER_PARALLELISM = ConfigOptions.key("kafka.consumer.parallelism").intType().defaultValue(0); + public static final ConfigOption<String> KAFKA_CONSUMER_SESSION_TIMEOUT_MS= ConfigOptions.key("kafka.consumer.session.timeout.ms").stringType().defaultValue("60000"); + public static final ConfigOption<String> KAFKA_CONSUMER_MAX_POLL_RECORD= ConfigOptions.key("kafka.consumer.max.poll.records").stringType().defaultValue("3000"); + public static final ConfigOption<String> KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES= ConfigOptions.key("kafka.consumer.max.partition.fetch.bytes").stringType().defaultValue("31457280"); + + + + public static final ConfigOption<String> KAFKA_PRODUCER_TOPIC = ConfigOptions.key("kafka.producer.topic").stringType().defaultValue(""); + public static final ConfigOption<String> KAFKA_PRODUCER_RETRIES = ConfigOptions.key("kafka.producer.retries").stringType().defaultValue("1"); + public static final ConfigOption<String> KAFKA_PRODUCER_LINGER_MS = ConfigOptions.key("kafka.producer.linger.ms").stringType().defaultValue("10"); + public static final ConfigOption<String> KAFKA_PRODUCER_REQUEST_TIMEOUT_MS = ConfigOptions.key("kafka.producer.request.timeout.ms").stringType().defaultValue("30000"); + public static final ConfigOption<String> KAFKA_PRODUCER_BATCH_SIZE = ConfigOptions.key("kafka.producer.batch.size").stringType().defaultValue("262144"); + public static final ConfigOption<String> KAFKA_PRODUCER_BUFFER_MEMORY = ConfigOptions.key("kafka.producer.buffer.memory").stringType().defaultValue("134217728"); + public static final ConfigOption<String> KAFKA_PRODUCER_MAX_REQUEST_SIZE = ConfigOptions.key("kafka.producer.max.request.size").stringType().defaultValue("10485760"); + public static final ConfigOption<String> KAFKA_PRODUCER_COMPRESSION_TYPE = ConfigOptions.key("kafka.producer.compression.type").stringType().defaultValue("none"); + public static final ConfigOption<String> KAFKA_PRODUCER_BROKER = ConfigOptions.key("kafka.producer.broker").stringType().defaultValue(""); + + + public static final ConfigOption<String> JOB_NAME = ConfigOptions.key("job.name").stringType().defaultValue(""); + public static final ConfigOption<Integer> TASK_PARALLELISM = ConfigOptions.key("task.parallelism").intType().defaultValue(0); + public static final ConfigOption<Integer> ORDERBY_PARALLELISM = ConfigOptions.key("orderby.parallelism").intType().defaultValue(0); + public static final ConfigOption<Integer> SINK_PARALLELISM = ConfigOptions.key("sink.parallelism").intType().defaultValue(0); + + public static final ConfigOption<Integer> WATERMARK_TIME = ConfigOptions.key("watermark.time").intType().defaultValue(0); + public static final ConfigOption<Integer> WINDOW_TIME_MINUTE = ConfigOptions.key("window.time.minute").intType().defaultValue(0); + public static final ConfigOption<Integer> TOP_LIMIT = ConfigOptions.key("top.limit").intType().defaultValue(0); + + public static final ConfigOption<String> KAFKA_CONSUMER_USER = ConfigOptions.key("kafka.consumer.user").stringType().defaultValue(""); + public static final ConfigOption<String> KAFKA_CONSUMER_PIN = ConfigOptions.key("kafka.consumer.pin").stringType().defaultValue(""); + public static final ConfigOption<Integer> KAFKA_CONSUMER_SECURITY = ConfigOptions.key("kafka.consumer.security").intType().defaultValue(0); + public static final ConfigOption<String> TOOLS_CONSUMER_LIBRARY = ConfigOptions.key("tools.consumer.library").stringType().defaultValue(""); + + public static final ConfigOption<String> KAFKA_PRODUCER_USER = ConfigOptions.key("kafka.producer.user").stringType().defaultValue(""); + public static final ConfigOption<String> KAFKA_PRODUCER_PIN = ConfigOptions.key("kafka.producer.pin").stringType().defaultValue(""); + public static final ConfigOption<Integer> KAFKA_PRODUCER_SECURITY = ConfigOptions.key("kafka.producer.security").intType().defaultValue(0); + public static final ConfigOption<String> TOOLS_PRODUCER_LIBRARY = ConfigOptions.key("tools.producer.library").stringType().defaultValue(""); + + + +} diff --git a/src/main/java/com/galaxy/tsg/config/commonConfigurations.java b/src/main/java/com/galaxy/tsg/config/CommonConfigurations.java index 1da5d7f..447724f 100644 --- a/src/main/java/com/galaxy/tsg/config/commonConfigurations.java +++ b/src/main/java/com/galaxy/tsg/config/CommonConfigurations.java @@ -9,7 +9,7 @@ import java.util.Properties; * @author Administrator */ -public final class commonConfigurations { +public final class CommonConfigurations { private static Properties propService = new Properties(); @@ -54,7 +54,7 @@ public final class commonConfigurations { static { try { - propService.load(commonConfigurations.class.getClassLoader().getResourceAsStream("common.properties")); + propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties")); } catch (Exception e) { propService = null; } diff --git a/src/main/java/com/galaxy/tsg/config/commonConfig.java b/src/main/java/com/galaxy/tsg/config/commonConfig.java deleted file mode 100644 index 2136a62..0000000 --- a/src/main/java/com/galaxy/tsg/config/commonConfig.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.galaxy.tsg.config; - -/** - * Created by wk on 2021/1/6. - */ -public class commonConfig { - - - public static final String KAFKA_CONSUMER_BROKER = commonConfigurations.getStringProperty("kafka.consumer.broker"); - public static final String KAFKA_CONSUMER_GROUP_ID = commonConfigurations.getStringProperty("kafka.consumer.group.id"); - public static final String KAFKA_CONSUMER_TOPIC = commonConfigurations.getStringProperty("kafka.consumer.topic"); - public static final int KAFKA_CONSUMER_PARALLELISM = commonConfigurations.getIntProperty("kafka.consumer.parallelism"); - public static final String KAFKA_CONSUMER_SESSION_TIMEOUT_MS= commonConfigurations.getStringProperty("kafka.consumer.session.timeout.ms"); - public static final String KAFKA_CONSUMER_MAX_POLL_RECORD= commonConfigurations.getStringProperty("kafka.consumer.max.poll.records"); - public static final String KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES= commonConfigurations.getStringProperty("kafka.consumer.max.partition.fetch.bytes"); - - - - public static final String KAFKA_PRODUCER_TOPIC = commonConfigurations.getStringProperty("kafka.producer.topic"); - public static final String KAFKA_PRODUCER_RETRIES = commonConfigurations.getStringProperty("kafka.producer.retries"); - public static final String KAFKA_PRODUCER_LINGER_MS = commonConfigurations.getStringProperty("kafka.producer.linger.ms"); - public static final String KAFKA_PRODUCER_REQUEST_TIMEOUT_MS = commonConfigurations.getStringProperty("kafka.producer.request.timeout.ms"); - public static final String KAFKA_PRODUCER_BATCH_SIZE = commonConfigurations.getStringProperty("kafka.producer.batch.size"); - public static final String KAFKA_PRODUCER_BUFFER_MEMORY = commonConfigurations.getStringProperty("kafka.producer.buffer.memory"); - public static final String KAFKA_PRODUCER_MAX_REQUEST_SIZE = commonConfigurations.getStringProperty("kafka.producer.max.request.size"); - public static final String KAFKA_PRODUCER_COMPRESSION_TYPE = commonConfigurations.getStringProperty("kafka.producer.compression.type"); - public static final String KAFKA_PRODUCER_BROKER = commonConfigurations.getStringProperty("kafka_producer_broker"); - - - - public static final String JOB_NAME = commonConfigurations.getStringProperty("job.name"); - public static final int TASK_PARALLELISM = commonConfigurations.getIntProperty("task.parallelism"); - public static final int ORDERBY_PARALLELISM = commonConfigurations.getIntProperty("orderby.parallelism"); - public static final int SINK_PARALLELISM = commonConfigurations.getIntProperty("sink.parallelism"); - - public static final int WATERMARK_TIME = commonConfigurations.getIntProperty("watermark.time"); - public static final int WINDOW_TIME_MINUTE = commonConfigurations.getIntProperty("window.time.minute"); - public static final int TOP_LIMIT = commonConfigurations.getIntProperty("top.limit"); - - public static final String KAFKA_CONSUMER_USER = commonConfigurations.getStringProperty("kafka.consumer.user"); - public static final String KAFKA_CONSUMER_PIN = commonConfigurations.getStringProperty("kafka.consumer.pin"); - public static final int KAFKA_CONSUMER_SECURITY = commonConfigurations.getIntProperty("kafka.consumer.security"); - public static final String TOOLS_CONSUMER_LIBRARY = commonConfigurations.getStringProperty("tools.consumer.library"); - - public static final String KAFKA_PRODUCER_USER = commonConfigurations.getStringProperty("kafka.producer.user"); - public static final String KAFKA_PRODUCER_PIN = commonConfigurations.getStringProperty("kafka.producer.pin"); - public static final int KAFKA_PRODUCER_SECURITY = commonConfigurations.getIntProperty("kafka.producer.security"); - public static final String TOOLS_PRODUCER_LIBRARY = commonConfigurations.getStringProperty("tools.producer.library"); - - - -} diff --git a/src/main/java/com/galaxy/tsg/function/FlatMapFunction.java b/src/main/java/com/galaxy/tsg/function/FlatMapFunction.java new file mode 100644 index 0000000..6cfad66 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/function/FlatMapFunction.java @@ -0,0 +1,89 @@ +package com.galaxy.tsg.function; + +import com.galaxy.tsg.pojo.SessionEntity; +import com.galaxy.tsg.pojo.TransformEntity; +import com.geedgenetworks.utils.FormatUtils; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlatMapFunction implements org.apache.flink.api.common.functions.FlatMapFunction<SessionEntity, TransformEntity> { + + private static final Logger LOG = LoggerFactory.getLogger(FlatMapFunction.class); + + + @Override + public void flatMap(SessionEntity sessionEntity, Collector<TransformEntity> out) throws Exception { + TransformEntity transformEntity = new TransformEntity(); + try { + transformEntity.setServer_ip(sessionEntity.getServer_ip()); + transformEntity.setClient_ip(sessionEntity.getClient_ip()); + transformEntity.setSubscriber_id(sessionEntity.getSubscriber_id()); + transformEntity.setFqdn(sessionEntity.getServer_fqdn()); + + if(sessionEntity.getHttp_host()!=null && !sessionEntity.getHttp_host().isEmpty()){ + transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getHttp_host())); + } + else if(sessionEntity.getSsl_sni()!=null && !sessionEntity.getSsl_sni().isEmpty()) { + transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getSsl_sni())); + } + else if(sessionEntity.getDtls_sni()!=null && !sessionEntity.getDtls_sni().isEmpty()){ + transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getDtls_sni())); + } + if(sessionEntity.getQuic_sni()!=null && !sessionEntity.getQuic_sni().isEmpty()){ + transformEntity.setDomain(FormatUtils.getTopPrivateDomain(sessionEntity.getQuic_sni())); + } + if(transformEntity.getDomain()!=null && transformEntity.getDomain().contains("InternetDomain")){ + System.out.println(transformEntity.getDomain()); + } + transformEntity.setDevice_group(sessionEntity.getDevice_group()); + transformEntity.setDevice_id(sessionEntity.getDevice_id()); + transformEntity.setData_center(sessionEntity.getData_center()); + transformEntity.setVsys_id(sessionEntity.getVsys_id()); + transformEntity.setTimestamp(sessionEntity.getRecv_time()/1000); + transformEntity.setSessions(1); +// + + + if ((8L & sessionEntity.getFlags()) == 8L) { + + transformEntity.setOut_bytes(sessionEntity.getSent_bytes()); + transformEntity.setOut_pkts(sessionEntity.getSent_pkts()); + transformEntity.setIn_bytes(sessionEntity.getReceived_bytes()); + transformEntity.setIn_pkts(sessionEntity.getReceived_pkts()); + + } else { + transformEntity.setOut_bytes(sessionEntity.getReceived_bytes()); + transformEntity.setOut_pkts(sessionEntity.getReceived_pkts()); + transformEntity.setIn_bytes(sessionEntity.getSent_bytes()); + transformEntity.setIn_pkts(sessionEntity.getSent_pkts()); + + + } + if(sessionEntity.getFlags()>0) { + + if ((8L & sessionEntity.getFlags()) == 8L && (16L & sessionEntity.getFlags()) == 16L) { + transformEntity.setInternal_ip(sessionEntity.getServer_ip()); + out.collect(transformEntity); + transformEntity.setInternal_ip(sessionEntity.getClient_ip()); + + } else if ((8L & sessionEntity.getFlags()) == 8L && (16L & sessionEntity.getFlags()) != 16L) { + transformEntity.setInternal_ip(sessionEntity.getClient_ip()); + transformEntity.setExternal_ip(sessionEntity.getServer_ip()); + } else if ((8L & sessionEntity.getFlags()) != 8L && (16L & sessionEntity.getFlags()) == 16L) { + transformEntity.setInternal_ip(sessionEntity.getServer_ip()); + transformEntity.setExternal_ip(sessionEntity.getClient_ip()); + } else if ((8L & sessionEntity.getFlags()) != 8L && (16L & sessionEntity.getFlags()) != 16L) { + transformEntity.setExternal_ip(sessionEntity.getServer_ip()); + out.collect(transformEntity); + transformEntity.setExternal_ip(sessionEntity.getClient_ip()); + + } + } + } catch (Exception e) { + LOG.error("Entity Parsing ERROR"); + transformEntity.setIfError(1); + } + out.collect(transformEntity); + } +} diff --git a/src/main/java/com/galaxy/tsg/function/metricsAggregationReduce.java b/src/main/java/com/galaxy/tsg/function/MetricsAggregationReduce.java index 63eb9b6..3fb1b47 100644 --- a/src/main/java/com/galaxy/tsg/function/metricsAggregationReduce.java +++ b/src/main/java/com/galaxy/tsg/function/MetricsAggregationReduce.java @@ -1,12 +1,12 @@ package com.galaxy.tsg.function; -import com.galaxy.tsg.pojo.transformEntity; +import com.galaxy.tsg.pojo.TransformEntity; import org.apache.flink.api.common.functions.ReduceFunction; -public class metricsAggregationReduce implements ReduceFunction<transformEntity> { +public class MetricsAggregationReduce implements ReduceFunction<TransformEntity> { @Override - public transformEntity reduce(transformEntity value1, transformEntity value2) throws Exception { + public TransformEntity reduce(TransformEntity value1, TransformEntity value2) throws Exception { value1.setOut_pkts(value1.getOut_pkts() + value2.getOut_pkts()); value1.setOut_bytes(value1.getOut_bytes() + value2.getOut_bytes()); value1.setIn_bytes(value1.getIn_bytes() + value2.getIn_bytes()); diff --git a/src/main/java/com/galaxy/tsg/function/metricsCalculate.java b/src/main/java/com/galaxy/tsg/function/MetricsCalculate.java index 67d7264..60d2c36 100644 --- a/src/main/java/com/galaxy/tsg/function/metricsCalculate.java +++ b/src/main/java/com/galaxy/tsg/function/MetricsCalculate.java @@ -6,9 +6,9 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -public class metricsCalculate extends ProcessWindowFunction< - transformEntity, // 输入类型 - resultEntity, // 输出类型 +public class MetricsCalculate extends ProcessWindowFunction< + TransformEntity, // 输入类型 + ResultEntity, // 输出类型 Tuple5<String, Long, String, String, String>, // 键类型 TimeWindow> { // 窗口类型 @@ -16,25 +16,25 @@ public class metricsCalculate extends ProcessWindowFunction< @Override public void process(Tuple5<String, Long, String, String, String> s, Context context, - Iterable<transformEntity> elements, Collector<resultEntity> out) throws Exception { + Iterable<TransformEntity> elements, Collector<ResultEntity> out) throws Exception { if (elements.iterator().hasNext()) { - transformEntity objectTransformEntity = elements.iterator().next(); - resultEntity enSession = new resultEntity(); + TransformEntity objectTransformEntity = elements.iterator().next(); + ResultEntity enSession = new ResultEntity(); enSession.setOrder_by("sessions"); enSession.setStat_time(context.window().getStart()); enSession.setSessionResultEntity(enrichessionResult(context.window().getStart(), objectTransformEntity)); out.collect(enSession); - resultEntity enPacket = new resultEntity(); + ResultEntity enPacket = new ResultEntity(); enPacket.setOrder_by("packets"); enPacket.setStat_time(context.window().getStart()); enPacket.setPacketResultEntity(enrichPacketResult(context.window().getStart() , objectTransformEntity)); out.collect(enPacket); - resultEntity enbyte = new resultEntity(); + ResultEntity enbyte = new ResultEntity(); enbyte.setOrder_by("bytes"); enbyte.setStat_time(context.window().getStart()); enbyte.setByteResultEntity(enrichByteResult(context.window().getStart(), objectTransformEntity)); @@ -46,8 +46,8 @@ public class metricsCalculate extends ProcessWindowFunction< } - public byteResultEntity enrichByteResult(Long time, transformEntity objectTransformEntity) { - byteResultEntity en = new byteResultEntity(); + public ByteResultEntity enrichByteResult(Long time, TransformEntity objectTransformEntity) { + ByteResultEntity en = new ByteResultEntity(); en.setVsys_id(objectTransformEntity.getVsys_id()); en.setTimestamp_ms(time); en.setSessions(objectTransformEntity.getSessions()); @@ -72,9 +72,9 @@ public class metricsCalculate extends ProcessWindowFunction< } - public sessionResultEntity enrichessionResult(Long time, transformEntity objectTransformEntity) { + public SessionResultEntity enrichessionResult(Long time, TransformEntity objectTransformEntity) { - sessionResultEntity en = new sessionResultEntity(); + SessionResultEntity en = new SessionResultEntity(); en.setVsys_id(objectTransformEntity.getVsys_id()); en.setTimestamp_ms(time); en.setSessions(objectTransformEntity.getSessions()); @@ -96,8 +96,8 @@ public class metricsCalculate extends ProcessWindowFunction< return en; } - public packetResultEntity enrichPacketResult(Long time, transformEntity objectTransformEntity) { - packetResultEntity en = new packetResultEntity(); + public PacketResultEntity enrichPacketResult(Long time, TransformEntity objectTransformEntity) { + PacketResultEntity en = new PacketResultEntity(); en.setVsys_id(objectTransformEntity.getVsys_id()); en.setTimestamp_ms(time); en.setSessions(objectTransformEntity.getSessions()); diff --git a/src/main/java/com/galaxy/tsg/function/topnHotItems.java b/src/main/java/com/galaxy/tsg/function/TopnHotItems.java index c42b031..4736459 100644 --- a/src/main/java/com/galaxy/tsg/function/topnHotItems.java +++ b/src/main/java/com/galaxy/tsg/function/TopnHotItems.java @@ -11,14 +11,14 @@ import org.apache.flink.util.Collector; import java.util.Map; import java.util.PriorityQueue; -public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEntity, String> { +public class TopnHotItems extends KeyedProcessFunction<Tuple1<String>, ResultEntity, String> { private final int topSize; - private PriorityQueue<sessionResultEntity> sessionOrderEntity ; - private PriorityQueue<packetResultEntity> packetOrderEntity ; - private PriorityQueue<byteResultEntity> byteOrderEntity ; + private PriorityQueue<SessionResultEntity> sessionOrderEntity ; + private PriorityQueue<PacketResultEntity> packetOrderEntity ; + private PriorityQueue<ByteResultEntity> byteOrderEntity ; - public topnHotItems(int i) { + public TopnHotItems(int i) { this.topSize = i; @@ -36,7 +36,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn @Override - public void processElement(resultEntity objectEntity, Context context, Collector<String> collector) { + public void processElement(ResultEntity objectEntity, Context context, Collector<String> collector) { @@ -49,7 +49,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn sessionOrderEntity.add(objectEntity.getSessionResultEntity()); } else { if (sessionOrderEntity.peek() != null) { - sessionResultEntity res=sessionOrderEntity.peek(); + SessionResultEntity res=sessionOrderEntity.peek(); if (res.getSessions() <= objectEntity.getSessionResultEntity().getSessions()) { sessionOrderEntity.poll(); sessionOrderEntity.add(objectEntity.getSessionResultEntity()); @@ -63,7 +63,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn packetOrderEntity.add(objectEntity.getPacketResultEntity()); } else { if (packetOrderEntity.peek() != null) { - packetResultEntity res=packetOrderEntity.peek(); + PacketResultEntity res=packetOrderEntity.peek(); if ((res.getIn_pkts()+res.getOut_pkts()) <= (objectEntity.getPacketResultEntity().getIn_pkts()+objectEntity.getPacketResultEntity().getOut_pkts())) { packetOrderEntity.poll(); packetOrderEntity.add(objectEntity.getPacketResultEntity()); @@ -76,7 +76,7 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn byteOrderEntity.add(objectEntity.getByteResultEntity()); } else { if (byteOrderEntity.peek() != null) { - byteResultEntity res=byteOrderEntity.peek(); + ByteResultEntity res=byteOrderEntity.peek(); if ((res.getIn_bytes()+res.getOut_bytes()) <= (objectEntity.getByteResultEntity().getIn_bytes()+objectEntity.getByteResultEntity().getOut_bytes())) { byteOrderEntity.poll(); byteOrderEntity.add(objectEntity.getByteResultEntity()); @@ -96,9 +96,9 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) { - for(sessionResultEntity en : sessionOrderEntity) { + for(SessionResultEntity en : sessionOrderEntity) { - metricsEntity metricsEntity = new metricsEntity(); + MetricsEntity metricsEntity = new MetricsEntity(); metricsEntity.setName("sessions_top_" + en.getKey_by()); Map tags = new LinkedMap(); tags.put("vsys_id", en.getVsys_id()); @@ -145,10 +145,10 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn } - for(packetResultEntity en : packetOrderEntity){ + for(PacketResultEntity en : packetOrderEntity){ - metricsEntity metricsEntity = new metricsEntity(); + MetricsEntity metricsEntity = new MetricsEntity(); metricsEntity.setName("packets_top_" + en.getKey_by()); Map tags = new LinkedMap(); tags.put("vsys_id", en.getVsys_id()); @@ -195,15 +195,9 @@ public class topnHotItems extends KeyedProcessFunction<Tuple1<String>, resultEn } - for(byteResultEntity en : byteOrderEntity){ + for(ByteResultEntity en : byteOrderEntity){ - - - - - - - metricsEntity metricsEntity = new metricsEntity(); + MetricsEntity metricsEntity = new MetricsEntity(); metricsEntity.setName("bytes_top_" + en.getKey_by()); Map tags = new LinkedMap(); tags.put("vsys_id", en.getVsys_id()); diff --git a/src/main/java/com/galaxy/tsg/pojo/byteResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/ByteResultEntity.java index 4b0849e..6771bc7 100644 --- a/src/main/java/com/galaxy/tsg/pojo/byteResultEntity.java +++ b/src/main/java/com/galaxy/tsg/pojo/ByteResultEntity.java @@ -1,6 +1,6 @@ package com.galaxy.tsg.pojo; -public class byteResultEntity implements Comparable<byteResultEntity> { +public class ByteResultEntity implements Comparable<ByteResultEntity> { @@ -191,7 +191,7 @@ public class byteResultEntity implements Comparable<byteResultEntity> { } @Override - public int compareTo(byteResultEntity per) { + public int compareTo(ByteResultEntity per) { if((this.out_bytes+this.in_bytes)>=(per.out_bytes+per.in_bytes)){ return 1 ; }else{ diff --git a/src/main/java/com/galaxy/tsg/pojo/metricsEntity.java b/src/main/java/com/galaxy/tsg/pojo/MetricsEntity.java index 09fe562..c0e4b31 100644 --- a/src/main/java/com/galaxy/tsg/pojo/metricsEntity.java +++ b/src/main/java/com/galaxy/tsg/pojo/MetricsEntity.java @@ -2,7 +2,7 @@ package com.galaxy.tsg.pojo; import java.util.Map; -public class metricsEntity { +public class MetricsEntity { private String name; private Map<String,String> tags; private Map<String,Long> fields; diff --git a/src/main/java/com/galaxy/tsg/pojo/packetResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/PacketResultEntity.java index cc722b5..2bae32a 100644 --- a/src/main/java/com/galaxy/tsg/pojo/packetResultEntity.java +++ b/src/main/java/com/galaxy/tsg/pojo/PacketResultEntity.java @@ -1,6 +1,6 @@ package com.galaxy.tsg.pojo; -public class packetResultEntity implements Comparable<packetResultEntity>, Cloneable { +public class PacketResultEntity implements Comparable<PacketResultEntity>, Cloneable { private long timestamp_ms; @@ -189,7 +189,7 @@ public class packetResultEntity implements Comparable<packetResultEntity>, Clone } @Override - public int compareTo(packetResultEntity per) { + public int compareTo(PacketResultEntity per) { if((this.out_pkts+this.in_pkts)>=(per.out_pkts+per.in_pkts)){ return 1 ; }else{ diff --git a/src/main/java/com/galaxy/tsg/pojo/ResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/ResultEntity.java new file mode 100644 index 0000000..710fba4 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/pojo/ResultEntity.java @@ -0,0 +1,56 @@ +package com.galaxy.tsg.pojo; + +public class ResultEntity { + + + + private String order_by; + private Long stat_time; + + private SessionResultEntity sessionResultEntity; + private PacketResultEntity packetResultEntity; + private ByteResultEntity byteResultEntity; + + + public String getOrder_by() { + return order_by; + } + + + + public void setOrder_by(String order_by) { + this.order_by = order_by; + } + + public Long getStat_time() { + return stat_time; + } + + public void setStat_time(Long stat_time) { + this.stat_time = stat_time; + } + + public SessionResultEntity getSessionResultEntity() { + return sessionResultEntity; + } + + public void setSessionResultEntity(SessionResultEntity sessionResultEntity) { + this.sessionResultEntity = sessionResultEntity; + } + + public PacketResultEntity getPacketResultEntity() { + return packetResultEntity; + } + + public void setPacketResultEntity(PacketResultEntity packetResultEntity) { + this.packetResultEntity = packetResultEntity; + } + + public ByteResultEntity getByteResultEntity() { + return byteResultEntity; + } + + public void setByteResultEntity(ByteResultEntity byteResultEntity) { + this.byteResultEntity = byteResultEntity; + } +} diff --git a/src/main/java/com/galaxy/tsg/pojo/SessionEntity.java b/src/main/java/com/galaxy/tsg/pojo/SessionEntity.java new file mode 100644 index 0000000..83bd02d --- /dev/null +++ b/src/main/java/com/galaxy/tsg/pojo/SessionEntity.java @@ -0,0 +1,200 @@ +package com.galaxy.tsg.pojo; + +import java.io.Serializable; + +public class SessionEntity implements Serializable{ + + public String client_ip ; + public String app ; + public long recv_time ; + public String server_fqdn; + public long flags; + public String decoded_as ; + public String server_ip ; + public String http_host ; + public String ssl_sni ; + public String dtls_sni ; + public String quic_sni ; + public long vsys_id ; + public String device_group ; + public String device_id ; + public String data_center; + + public String subscriber_id; + public long sent_pkts; + public long received_pkts; + public long sent_bytes ; + public long received_bytes; + + + public SessionEntity() { + } + + public String getDevice_id() { + return device_id; + } + + public void setDevice_id(String device_id) { + this.device_id = device_id; + } + + public String getClient_ip() { + return client_ip; + } + + public void setClient_ip(String client_ip) { + this.client_ip = client_ip; + } + + public String getApp() { + return app; + } + + public void setApp(String common_app_label) { + this.app = app; + } + + public long getRecv_time() { + return recv_time; + } + + public void setRecv_time(long recv_time) { + this.recv_time = recv_time; + } + + public String getDecoded_as() { + return decoded_as; + } + + public void setDecoded_as(String decoded_as) { + this.decoded_as = decoded_as; + } + + public String getServer_ip() { + return server_ip; + } + + public void setServer_ip(String server_ip) { + this.server_ip = server_ip; + } + + + public String getHttp_host() { + return http_host; + } + + public void setHttp_host(String http_host) { + this.http_host = http_host; + } + + public String getSsl_sni() { + return ssl_sni; + } + + public void setSsl_sni(String ssl_sni) { + this.ssl_sni = ssl_sni; + } + + public String getDtls_sni() { + return dtls_sni; + } + + public void setDtls_sni(String dtls_sni) { + this.dtls_sni = dtls_sni; + } + + public String getQuic_sni() { + return quic_sni; + } + + public void setQuic_sni(String quic_sni) { + this.quic_sni = quic_sni; + } + + public long getVsys_id() { + return vsys_id; + } + + public void setVsys_id(long common_vsys_id) { + this.vsys_id = vsys_id; + } + + public String getDevice_group() { + return device_group; + } + + public void setDevice_group(String device_group) { + this.device_group = device_group; + } + + + public String getData_center() { + return data_center; + } + + public void setData_center(String data_center) { + this.data_center = data_center; + } + + + + public String getSubscriber_id() { + return subscriber_id; + } + + public void setSubscriber_id(String subscriber_id) { + this.subscriber_id = subscriber_id; + } + + + public long getSent_pkts() { + return sent_pkts; + } + + public void setSent_pkts(long sent_pkts) { + this.sent_pkts = sent_pkts; + } + + public long getReceived_pkts() { + return received_pkts; + } + + public void setReceived_pkts(long received_pkts) { + this.received_pkts = received_pkts; + } + + public long getSent_bytes() { + return sent_bytes; + } + + public void setSent_bytes(long sent_bytes) { + this.sent_bytes = sent_bytes; + } + + public long getReceived_bytes() { + return received_bytes; + } + + public void setReceived_bytes(long received_bytes) { + this.received_bytes = received_bytes; + } + + + public String getServer_fqdn() { + return server_fqdn; + } + + public void setServer_fqdn(String server_fqdn) { + this.server_fqdn = server_fqdn; + } + + public long getFlags() { + return flags; + } + + public void setFlags(long flags) { + this.flags = flags; + } + + +} diff --git a/src/main/java/com/galaxy/tsg/pojo/sessionResultEntity.java b/src/main/java/com/galaxy/tsg/pojo/SessionResultEntity.java index 63a3ec8..e368a96 100644 --- a/src/main/java/com/galaxy/tsg/pojo/sessionResultEntity.java +++ b/src/main/java/com/galaxy/tsg/pojo/SessionResultEntity.java @@ -1,6 +1,6 @@ package com.galaxy.tsg.pojo; -public class sessionResultEntity implements Comparable<sessionResultEntity> { +public class SessionResultEntity implements Comparable<SessionResultEntity> { @@ -189,7 +189,7 @@ public class sessionResultEntity implements Comparable<sessionResultEntity> { } @Override - public int compareTo(sessionResultEntity per) { + public int compareTo(SessionResultEntity per) { if(this.sessions>=per.sessions){ return 1 ; }else{ diff --git a/src/main/java/com/galaxy/tsg/pojo/transformEntity.java b/src/main/java/com/galaxy/tsg/pojo/TransformEntity.java index 4759e63..f1dcb5e 100644 --- a/src/main/java/com/galaxy/tsg/pojo/transformEntity.java +++ b/src/main/java/com/galaxy/tsg/pojo/TransformEntity.java @@ -2,32 +2,32 @@ package com.galaxy.tsg.pojo; import java.io.Serializable; -public class transformEntity implements Serializable { - - public int ifError; - public long timestamp; - - - public String fqdn; - public String server_ip ; - public String domain ; - public long vsys_id ; - public String device_group ; - public String client_ip ; - public String device_id; - public String data_center; - public String internal_ip; - public String external_ip; - public String subscriber_id; - public long sessions; - public long out_bytes; - public long in_bytes; - public long out_pkts ; - public long in_pkts ; - public String key_by; - public String l4_protocol; - - public transformEntity() { +public class TransformEntity implements Serializable,Cloneable { + + private int ifError; + private long timestamp; + + + private String fqdn; + private String server_ip ; + private String domain ; + private long vsys_id ; + private String device_group ; + private String client_ip ; + private String device_id; + private String data_center; + private String internal_ip; + private String external_ip; + private String subscriber_id; + private long sessions; + private long out_bytes; + private long in_bytes; + private long out_pkts ; + private long in_pkts ; + private String key_by; + private String l4_protocol; + + public TransformEntity() { } public String getL4_protocol() { @@ -191,4 +191,9 @@ public class transformEntity implements Serializable { public void setKey_by(String key_by) { this.key_by = key_by; } + + @Override + public Object clone() throws CloneNotSupportedException { + return super.clone(); + } } diff --git a/src/main/java/com/galaxy/tsg/pojo/resultEntity.java b/src/main/java/com/galaxy/tsg/pojo/resultEntity.java deleted file mode 100644 index 0593d8e..0000000 --- a/src/main/java/com/galaxy/tsg/pojo/resultEntity.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.galaxy.tsg.pojo; - -public class resultEntity { - - - - private String order_by; - private Long stat_time; - - private com.galaxy.tsg.pojo.sessionResultEntity sessionResultEntity; - private com.galaxy.tsg.pojo.packetResultEntity packetResultEntity; - private com.galaxy.tsg.pojo.byteResultEntity byteResultEntity; - - - public String getOrder_by() { - return order_by; - } - - - - public void setOrder_by(String order_by) { - this.order_by = order_by; - } - - public Long getStat_time() { - return stat_time; - } - - public void setStat_time(Long stat_time) { - this.stat_time = stat_time; - } - - public com.galaxy.tsg.pojo.sessionResultEntity getSessionResultEntity() { - return sessionResultEntity; - } - - public void setSessionResultEntity(com.galaxy.tsg.pojo.sessionResultEntity sessionResultEntity) { - this.sessionResultEntity = sessionResultEntity; - } - - public com.galaxy.tsg.pojo.packetResultEntity getPacketResultEntity() { - return packetResultEntity; - } - - public void setPacketResultEntity(com.galaxy.tsg.pojo.packetResultEntity packetResultEntity) { - this.packetResultEntity = packetResultEntity; - } - - public com.galaxy.tsg.pojo.byteResultEntity getByteResultEntity() { - return byteResultEntity; - } - - public void setByteResultEntity(com.galaxy.tsg.pojo.byteResultEntity byteResultEntity) { - this.byteResultEntity = byteResultEntity; - } -} diff --git a/src/main/java/com/galaxy/tsg/pojo/sessionEntity.java b/src/main/java/com/galaxy/tsg/pojo/sessionEntity.java deleted file mode 100644 index 73e2c29..0000000 --- a/src/main/java/com/galaxy/tsg/pojo/sessionEntity.java +++ /dev/null @@ -1,204 +0,0 @@ -package com.galaxy.tsg.pojo; - -import java.io.Serializable; - -public class sessionEntity implements Serializable { - - public String common_client_ip ; - public String common_app_label ; - public long common_recv_time ; - public String common_server_fqdn; - public long common_flags; - public String common_schema_type ; - public String common_server_ip ; - public String http_domain ; - public long common_vsys_id ; - public String common_device_group ; - public String common_device_id ; - public String common_data_center; - public String common_l4_protocol; - public String common_internal_ip; - public String common_external_ip; - public String common_subscriber_id; - public long common_sessions; - public long common_c2s_pkt_num; - public long common_s2c_pkt_num; - public long common_c2s_byte_num ; - public long common_s2c_byte_num ; - - - public sessionEntity() { - } - - public String getCommon_device_id() { - return common_device_id; - } - - public void setCommon_device_id(String common_device_id) { - this.common_device_id = common_device_id; - } - - public String getCommon_client_ip() { - return common_client_ip; - } - - public void setCommon_client_ip(String common_client_ip) { - this.common_client_ip = common_client_ip; - } - - public String getCommon_app_label() { - return common_app_label; - } - - public void setCommon_app_label(String common_app_label) { - this.common_app_label = common_app_label; - } - - public long getCommon_recv_time() { - return common_recv_time; - } - - public void setCommon_recv_time(long common_recv_time) { - this.common_recv_time = common_recv_time; - } - - public String getCommon_schema_type() { - return common_schema_type; - } - - public void setCommon_schema_type(String common_schema_type) { - this.common_schema_type = common_schema_type; - } - - public String getCommon_server_ip() { - return common_server_ip; - } - - public void setCommon_server_ip(String common_server_ip) { - this.common_server_ip = common_server_ip; - } - - - - public String getHttp_domain() { - return http_domain; - } - - public void setHttp_domain(String http_domain) { - this.http_domain = http_domain; - } - - public long getCommon_vsys_id() { - return common_vsys_id; - } - - public void setCommon_vsys_id(long common_vsys_id) { - this.common_vsys_id = common_vsys_id; - } - - public String getCommon_device_group() { - return common_device_group; - } - - public void setCommon_device_group(String common_device_group) { - this.common_device_group = common_device_group; - } - - - public String getCommon_data_center() { - return common_data_center; - } - - public void setCommon_data_center(String common_data_center) { - this.common_data_center = common_data_center; - } - - public String getCommon_l4_protocol() { - return common_l4_protocol; - } - - public void setCommon_l4_protocol(String common_l4_protocol) { - this.common_l4_protocol = common_l4_protocol; - } - - public String getCommon_internal_ip() { - return common_internal_ip; - } - - public void setCommon_internal_ip(String common_internal_ip) { - this.common_internal_ip = common_internal_ip; - } - - public String getCommon_external_ip() { - return common_external_ip; - } - - public void setCommon_external_ip(String common_external_ip) { - this.common_external_ip = common_external_ip; - } - - public String getCommon_subscriber_id() { - return common_subscriber_id; - } - - public void setCommon_subscriber_id(String common_subscriber_id) { - this.common_subscriber_id = common_subscriber_id; - } - - public long getCommon_sessions() { - return common_sessions; - } - - public void setCommon_sessions(long common_sessions) { - this.common_sessions = common_sessions; - } - - public long getCommon_c2s_pkt_num() { - return common_c2s_pkt_num; - } - - public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) { - this.common_c2s_pkt_num = common_c2s_pkt_num; - } - - public long getCommon_s2c_pkt_num() { - return common_s2c_pkt_num; - } - - public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) { - this.common_s2c_pkt_num = common_s2c_pkt_num; - } - - public long getCommon_c2s_byte_num() { - return common_c2s_byte_num; - } - - public void setCommon_c2s_byte_num(long common_c2s_byte_num) { - this.common_c2s_byte_num = common_c2s_byte_num; - } - - public long getCommon_s2c_byte_num() { - return common_s2c_byte_num; - } - - public void setCommon_s2c_byte_num(long common_s2c_byte_num) { - this.common_s2c_byte_num = common_s2c_byte_num; - } - - - public String getCommon_server_fqdn() { - return common_server_fqdn; - } - - public void setCommon_server_fqdn(String common_server_fqdn) { - this.common_server_fqdn = common_server_fqdn; - } - - public long getCommon_flags() { - return common_flags; - } - - public void setCommon_flags(long common_flags) { - this.common_flags = common_flags; - } -} diff --git a/src/main/java/com/galaxy/tsg/selector/GroupBySelector.java b/src/main/java/com/galaxy/tsg/selector/GroupBySelector.java new file mode 100644 index 0000000..7c5f727 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/selector/GroupBySelector.java @@ -0,0 +1,48 @@ +package com.galaxy.tsg.selector; + +import com.galaxy.tsg.pojo.TransformEntity; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple5; + +public class GroupBySelector implements KeySelector<TransformEntity, Tuple5<String, Long, String, String, String>> { + + public String key; + + public GroupBySelector(String key) { + this.key = key; + } + + @Override + public Tuple5<String, Long, String, String, String> getKey(TransformEntity transformEntity) throws Exception { + + Tuple5<String, Long, String, String, String> tuple = null; + transformEntity.setKey_by(key); + switch (key) { + case "client_ip": + tuple = new Tuple5<>(transformEntity.getClient_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); + break; + case "server_ip": + tuple = new Tuple5<>(transformEntity.getServer_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); + break; + case "internal_ip": + tuple = new Tuple5<>(transformEntity.getInternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); + break; + case "external_ip": + tuple = new Tuple5<>(transformEntity.getExternal_ip(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); + break; + case "server_domain": + tuple = new Tuple5<>(transformEntity.getDomain(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); + break; + case "subscriber_id": + tuple = new Tuple5<>(transformEntity.getSubscriber_id(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); + break; + case "server_fqdn": + tuple = new Tuple5<>(transformEntity.getFqdn(), transformEntity.getVsys_id(), transformEntity.getDevice_group(), transformEntity.getData_center(), transformEntity.getDevice_id()); + break; + + default: + + } + return tuple; + } +} diff --git a/src/main/java/com/galaxy/tsg/selector/OrderBySelector.java b/src/main/java/com/galaxy/tsg/selector/OrderBySelector.java new file mode 100644 index 0000000..cf3b23a --- /dev/null +++ b/src/main/java/com/galaxy/tsg/selector/OrderBySelector.java @@ -0,0 +1,13 @@ +package com.galaxy.tsg.selector; + +import com.galaxy.tsg.pojo.ResultEntity; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple1; + +public class OrderBySelector implements KeySelector<ResultEntity, Tuple1<String>> { + + @Override + public Tuple1<String> getKey(ResultEntity entity) throws Exception { + return new Tuple1<>(entity.getOrder_by()); + } +} diff --git a/src/main/java/com/galaxy/tsg/util/KafkaUtils.java b/src/main/java/com/galaxy/tsg/util/KafkaUtils.java new file mode 100644 index 0000000..9aae43f --- /dev/null +++ b/src/main/java/com/galaxy/tsg/util/KafkaUtils.java @@ -0,0 +1,114 @@ +package com.galaxy.tsg.util; + +import com.galaxy.tsg.config.CommonConfig; +import com.galaxy.tsg.pojo.SessionEntity; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Optional; +import java.util.Properties; + +import static com.galaxy.tsg.config.CommonConfig.KAFKA_CONSUMER_TOPIC; +import static com.galaxy.tsg.config.CommonConfig.KAFKA_PRODUCER_TOPIC; + +public class KafkaUtils { + + + public static Properties getKafkaSourceProperty(Configuration configuration) { + Properties properties = new Properties(); + properties.setProperty("group.id", configuration.getString(CommonConfig.KAFKA_CONSUMER_GROUP_ID)); + properties.setProperty("bootstrap.servers",configuration.getString(CommonConfig.KAFKA_CONSUMER_BROKER)); + properties.setProperty("session.timeout.ms", configuration.getString(CommonConfig.KAFKA_CONSUMER_SESSION_TIMEOUT_MS)); + properties.setProperty("max.poll.records", configuration.getString(CommonConfig.KAFKA_CONSUMER_MAX_POLL_RECORD)); + properties.setProperty("max.partition.fetch.bytes", configuration.getString(CommonConfig.KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES)); + + + switch (configuration.getInteger(CommonConfig.KAFKA_CONSUMER_SECURITY)) { + case 1: + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", configuration.getString(CommonConfig.TOOLS_CONSUMER_LIBRARY) + "keystore.jks"); + properties.put("ssl.keystore.password", configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN)); + properties.put("ssl.truststore.location", configuration.getString(CommonConfig.TOOLS_CONSUMER_LIBRARY) + "truststore.jks"); + properties.put("ssl.truststore.password", configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN)); + properties.put("ssl.key.password", configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN)); + break; + case 2: + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + configuration.getString(CommonConfig.KAFKA_CONSUMER_USER) + " password=" + configuration.getString(CommonConfig.KAFKA_CONSUMER_PIN) + ";"); + break; + default: + } + + return properties; + } + private static Properties getKafkaSinkProperty(Configuration configuration) { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers",configuration.getString(CommonConfig.KAFKA_PRODUCER_BROKER)); + + properties.put("acks", "1"); + properties.put("retries", configuration.getString(CommonConfig.KAFKA_PRODUCER_RETRIES)); + properties.put("linger.ms", configuration.getString(CommonConfig.KAFKA_PRODUCER_LINGER_MS)); + properties.put("request.timeout.ms", configuration.getString(CommonConfig.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS)); + properties.put("batch.size", configuration.getString(CommonConfig.KAFKA_PRODUCER_BATCH_SIZE)); + properties.put("buffer.memory", configuration.getString(CommonConfig.KAFKA_PRODUCER_BUFFER_MEMORY)); + properties.put("max.request.size", configuration.getString(CommonConfig.KAFKA_PRODUCER_MAX_REQUEST_SIZE)); + properties.put("compression.type", configuration.getString(CommonConfig.KAFKA_PRODUCER_COMPRESSION_TYPE)); + switch (configuration.getInteger(CommonConfig.KAFKA_PRODUCER_SECURITY)) { + case 1: + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", configuration.getString(CommonConfig.TOOLS_PRODUCER_LIBRARY) + "keystore.jks"); + properties.put("ssl.keystore.password", configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN)); + properties.put("ssl.truststore.location", configuration.getString(CommonConfig.TOOLS_PRODUCER_LIBRARY) + "truststore.jks"); + properties.put("ssl.truststore.password", configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN)); + properties.put("ssl.key.password", configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN)); + break; + case 2: + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + configuration.getString(CommonConfig.KAFKA_PRODUCER_USER) + " password=" + configuration.getString(CommonConfig.KAFKA_PRODUCER_PIN) + ";"); + break; + default: + } + return properties; + } + + public static FlinkKafkaConsumer<SessionEntity> getKafkaConsumer(Configuration configuration) { + FlinkKafkaConsumer<SessionEntity> kafkaConsumer = new FlinkKafkaConsumer<>(configuration.getString(KAFKA_CONSUMER_TOPIC), + new TimestampDeserializationSchema(), getKafkaSourceProperty(configuration)); + + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } + /* public static FlinkKafkaConsumer<String> getKafkaConsumerLists(List<String> topic) { + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, + new SimpleStringSchema(), getKafkaSourceProperty(configuration)); + + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + }*/ + public static SinkFunction<String> getKafkaSink(Configuration configuration) { + FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<String>( + configuration.getString(KAFKA_PRODUCER_TOPIC), + new SimpleStringSchema(), + getKafkaSinkProperty(configuration), + Optional.empty() + ); + flinkKafkaProducer.setLogFailuresOnly(false); + return flinkKafkaProducer; + } + + +} diff --git a/src/main/java/com/galaxy/tsg/util/TimestampDeserializationSchema.java b/src/main/java/com/galaxy/tsg/util/TimestampDeserializationSchema.java new file mode 100644 index 0000000..c811094 --- /dev/null +++ b/src/main/java/com/galaxy/tsg/util/TimestampDeserializationSchema.java @@ -0,0 +1,48 @@ +package com.galaxy.tsg.util; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSON; +import com.galaxy.tsg.pojo.SessionEntity; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * @author qidaijie + * @version 2022/3/89:42 + */ +public class TimestampDeserializationSchema implements KafkaDeserializationSchema { + private static final Log logger = LogFactory.get(); + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(new TypeHint<SessionEntity>() {}); + } + + @Override + public boolean isEndOfStream(Object nextElement) { + return false; + } + + @Override + @SuppressWarnings("unchecked") + public SessionEntity deserialize(ConsumerRecord record) throws Exception { + if (record != null) { + try { + + + SessionEntity sessionEntity = JSON.parseObject((byte[]) record.value(), SessionEntity.class); + sessionEntity.setRecv_time(record.timestamp()); + + return sessionEntity; + } catch (RuntimeException e) { + logger.error( + "KafkaConsumer Deserialize failed,The exception is : " + e.getMessage()); + e.printStackTrace(); + } + } + return new SessionEntity(); + } +} diff --git a/src/main/java/com/galaxy/tsg/util/kafkaUtils.java b/src/main/java/com/galaxy/tsg/util/kafkaUtils.java deleted file mode 100644 index 5374b8b..0000000 --- a/src/main/java/com/galaxy/tsg/util/kafkaUtils.java +++ /dev/null @@ -1,109 +0,0 @@ -package com.galaxy.tsg.util; - -import com.galaxy.tsg.config.commonConfig; -import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.kafka.common.config.SslConfigs; - -import java.util.List; -import java.util.Optional; -import java.util.Properties; - -public class kafkaUtils { - - - public static Properties getKafkaSourceProperty() { - Properties properties = new Properties(); - properties.setProperty("group.id", commonConfig.KAFKA_CONSUMER_GROUP_ID); - properties.setProperty("bootstrap.servers", commonConfig.KAFKA_CONSUMER_BROKER); - properties.setProperty("session.timeout.ms", commonConfig.KAFKA_CONSUMER_SESSION_TIMEOUT_MS); - properties.setProperty("max.poll.records", commonConfig.KAFKA_CONSUMER_MAX_POLL_RECORD); - properties.setProperty("max.partition.fetch.bytes", commonConfig.KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES); - - switch (commonConfig.KAFKA_CONSUMER_SECURITY) { - case 1: - properties.put("security.protocol", "SSL"); - properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - properties.put("ssl.keystore.location", commonConfig.TOOLS_CONSUMER_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", commonConfig.KAFKA_CONSUMER_PIN); - properties.put("ssl.truststore.location", commonConfig.TOOLS_CONSUMER_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", commonConfig.KAFKA_CONSUMER_PIN); - properties.put("ssl.key.password", commonConfig.KAFKA_CONSUMER_PIN); - break; - case 2: - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + commonConfig.KAFKA_CONSUMER_USER + " password=" + commonConfig.KAFKA_CONSUMER_PIN + ";"); - break; - default: - } - - return properties; - } - - private static Properties getKafkaSinkProperty() { - Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", commonConfig.KAFKA_PRODUCER_BROKER); - - properties.put("acks", "1"); - properties.put("retries", commonConfig.KAFKA_PRODUCER_RETRIES); - properties.put("linger.ms", commonConfig.KAFKA_PRODUCER_LINGER_MS); - properties.put("request.timeout.ms", commonConfig.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS); - properties.put("batch.size", commonConfig.KAFKA_PRODUCER_BATCH_SIZE); - properties.put("buffer.memory", commonConfig.KAFKA_PRODUCER_BUFFER_MEMORY); - properties.put("max.request.size", commonConfig.KAFKA_PRODUCER_MAX_REQUEST_SIZE); - properties.put("compression.type", commonConfig.KAFKA_PRODUCER_COMPRESSION_TYPE); - - switch (commonConfig.KAFKA_PRODUCER_SECURITY) { - case 1: - properties.put("security.protocol", "SSL"); - properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - properties.put("ssl.keystore.location", commonConfig.TOOLS_PRODUCER_LIBRARY + "keystore.jks"); - properties.put("ssl.keystore.password", commonConfig.KAFKA_PRODUCER_PIN); - properties.put("ssl.truststore.location", commonConfig.TOOLS_PRODUCER_LIBRARY + "truststore.jks"); - properties.put("ssl.truststore.password", commonConfig.KAFKA_PRODUCER_PIN); - properties.put("ssl.key.password", commonConfig.KAFKA_PRODUCER_PIN); - break; - case 2: - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" - + commonConfig.KAFKA_PRODUCER_USER + " password=" + commonConfig.KAFKA_PRODUCER_PIN + ";"); - break; - default: - } - return properties; - } - - public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic) { - FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, - new SimpleStringSchema(), getKafkaSourceProperty()); - - kafkaConsumer.setCommitOffsetsOnCheckpoints(true); - kafkaConsumer.setStartFromGroupOffsets(); - - return kafkaConsumer; - } - public static FlinkKafkaConsumer<String> getKafkaConsumerLists(List<String> topic) { - FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, - new SimpleStringSchema(), getKafkaSourceProperty()); - - kafkaConsumer.setCommitOffsetsOnCheckpoints(true); - kafkaConsumer.setStartFromGroupOffsets(); - - return kafkaConsumer; - } - public static SinkFunction<String> getKafkaSink(String topic) { - return new FlinkKafkaProducer<String>( - topic, - new SimpleStringSchema(), - getKafkaSinkProperty(), - Optional.empty() - ); - } - - -} diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 4a28a54..ca8057f 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -1,37 +1,37 @@ #--------------------------------Kafka��������Ϣ------------------------------# #kafka�ĵ�ַ��Ϣ -kafka.consumer.broker=192.168.44.11:9092 +kafka.consumer.broker=192.168.54.241:9092 #kafka ��������topic -kafka.consumer.topic=SESSION-RECORD-COMPLETED +kafka.consumer.topic=SESSION-RECORD #���������� -kafka.consumer.group.id=topn-metrics-job-20231101-t1 +kafka.consumer.group.id=topn-metrics-job-20231101-t1-t #--------------------------------Kafka��������Ϣ------------------------------# #kafka�ĵ�ַ��Ϣ -kafka_producer_broker=192.168.44.12:9092 +kafka.producer.broker=192.168.44.12:9094 kafka.producer.topic=TRAFFIC-TOP-METRIC #--------------------------------topology����------------------------------# #�������� -job.name=TOPN-METRICS-JOB +job.name=agg_session_record_topn #source���ж� -kafka.consumer.parallelism=1 +kafka.consumer.parallelism=3 #�����ж� -task.parallelism=1 +task.parallelism=3 #���������ж� -orderby.parallelism=1 +orderby.parallelism=3 #��Ⲣ�жȣ�ͨ������orderby.parallelism -sink.parallelism=1 +sink.parallelism=3 #�����ӳٵȴ�ʱ�䵥λ�� -watermark.time=60 +watermark.time=90 #top������� top.limit=10000 @@ -41,13 +41,7 @@ window.time.minute=5 #--------------------------------Kafka��������------------------------------# #kafka source poll -kafka.consumer.max.poll.records=3000 -#kafka source connection timeout -kafka.consumer.session.timeout.ms=60000 - -#kafka source poll bytes -kafka.consumer.max.partition.fetch.bytes=31457280 #kafka�������Ƿ�����ȫ��֤ 0������ 1SSL 2 SASL kafka.consumer.security=0 @@ -62,7 +56,7 @@ kafka.consumer.pin=galaxy2019 tools.consumer.library=/home/bigdata/topology/dat/ #kafka�������Ƿ�����ȫ��֤ 0������ 1SSL 2 SASL -kafka.producer.security=0 +kafka.producer.security=2 #kafka SASL��֤�û��� kafka.producer.user=admin @@ -73,23 +67,5 @@ kafka.producer.pin=galaxy2019 #1SSL��Ҫ tools.producer.library=/home/bigdata/topology/dat/ -#producer���ԵĴ������� -kafka.producer.retries=1 - -#���ĺ������˵һ��Batch������֮��������ã��������Batch��û��д���������뷢�ͳ�ȥ�� -kafka.producer.linger.ms=1 - -#����ڳ�ʱ֮ǰδ�յ���Ӧ���ͻ��˽��ڱ�Ҫʱ���·������� -kafka.producer.request.timeout.ms=30000 - -#producer���ǰ���batch���з��͵�,���δ�С��Ĭ��:16384 -kafka.producer.batch.size=262144 - -#Producer�����ڻ�����Ϣ�Ļ�������С -kafka.producer.buffer.memory=134217728 -#�������������ÿ�η���Kafka���������������С,Ĭ��1048576 -kafka.producer.max.request.size=10485760 -#����kafkaѹ�����ͣ�Ĭ�ϲ����� -kafka.producer.compression.type=none diff --git a/src/test/java/com/galaxy/tsg/catalog/CatalogTest.java b/src/test/java/com/galaxy/tsg/catalog/CatalogTest.java deleted file mode 100644 index bb7ef2c..0000000 --- a/src/test/java/com/galaxy/tsg/catalog/CatalogTest.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.galaxy.tsg.catalog; - -public class CatalogTest { - public static void main(String[] args) { - - } -} diff --git a/src/test/java/com/galaxy/tsg/top/TopTest.java b/src/test/java/com/galaxy/tsg/top/TopTest.java new file mode 100644 index 0000000..1acd6be --- /dev/null +++ b/src/test/java/com/galaxy/tsg/top/TopTest.java @@ -0,0 +1,110 @@ +package com.galaxy.tsg.top; + +import com.alibaba.fastjson2.JSON; +import com.galaxy.tsg.function.FlatMapFunction; +import com.galaxy.tsg.pojo.SessionEntity; +import com.galaxy.tsg.pojo.TransformEntity; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.*; + +public class TopTest { + @ClassRule + public static MiniClusterWithClientResource flinkCluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build()); + + + + + @Test + public void testIncrementPipeline() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + + String jsonString1 = "{\"client_ip\":\"192.168.1.1\",\"app\":\"app1\",\"recv_time\":0,\"server_fqdn\":\"tuplogpublic.bangcdn.net\",\"flags\":24,\"decoded_as\":\"HTTP\",\"server_ip\":\"192.168.2.1\",\"http_host\":\"tuplogpublic.bangcdn.net\",\"vsys_id\":1,\"device_group\":\"group1\",\"device_id\":\"device_id1\",\"subscriber_id\":\"subscriber_id1\",\"data_center\":\"data_center1\",\"sent_pkts\":1,\"received_pkts\":1,\"sent_bytes\":1,\"received_bytes\":1 }"; + String jsonString2 = "{\"client_ip\":\"192.168.1.2\",\"app\":\"app1\",\"recv_time\":0,\"server_fqdn\":\"tuplogpublic.bangcdn.net\",\"flags\":8,\"decoded_as\":\"HTTP\",\"server_ip\":\"192.168.2.2\",\"http_host\":\"tuplogpublic.bangcdn.net\",\"vsys_id\":1,\"device_group\":\"group1\",\"device_id\":\"device_id1\",\"subscriber_id\":\"subscriber_id1\",\"data_center\":\"data_center1\",\"sent_pkts\":2,\"received_pkts\":1,\"sent_bytes\":1,\"received_bytes\":1 }"; + String jsonString3 = "{\"client_ip\":\"192.168.1.3\",\"app\":\"app1\",\"recv_time\":0,\"server_fqdn\":\"tuplogpublic.bangcdn.net\",\"flags\":16,\"decoded_as\":\"HTTP\",\"server_ip\":\"192.168.2.3\",\"http_host\":\"tuplogpublic.bangcdn.net\",\"vsys_id\":1,\"device_group\":\"group1\",\"device_id\":\"device_id1\",\"subscriber_id\":\"subscriber_id1\",\"data_center\":\"data_center1\",\"sent_pkts\":3,\"received_pkts\":1,\"sent_bytes\":1,\"received_bytes\":1 }"; + String jsonString4 = "{\"client_ip\":\"192.168.1.4\",\"app\":\"app1\",\"recv_time\":0,\"server_fqdn\":\"tuplogpublic.bangcdn.net\",\"flags\":1,\"decoded_as\":\"HTTP\",\"server_ip\":\"192.168.2.4\",\"http_host\":\"tuplogpublic.bangcdn.net\",\"vsys_id\":1,\"device_group\":\"group1\",\"device_id\":\"device_id1\",\"subscriber_id\":\"subscriber_id1\",\"data_center\":\"data_center1\",\"sent_pkts\":4,\"received_pkts\":1,\"sent_bytes\":1,\"received_bytes\":1 }"; + + + // configure your test environment + + SessionEntity sessionEntity1 = JSON.parseObject(jsonString1, SessionEntity.class); + SessionEntity sessionEntity2 = JSON.parseObject(jsonString2, SessionEntity.class); + SessionEntity sessionEntity3 = JSON.parseObject(jsonString3, SessionEntity.class); + SessionEntity sessionEntity4 = JSON.parseObject(jsonString4, SessionEntity.class); + + env.setParallelism(1); + // values are collected in a static variable + CollectSink.values.clear(); + + + ParameterTool serviceConfig = ParameterTool.fromPropertiesFile("src\\main\\resources\\common.properties"); + Configuration configurationService = serviceConfig.getConfiguration(); + // global register + env.getConfig().setGlobalJobParameters(configurationService); + + + // create a stream of custom elements and apply transformations + env.fromElements(sessionEntity1,sessionEntity2,sessionEntity3,sessionEntity4) + .flatMap(new FlatMapFunction()) + .addSink(new CollectSink()); + + // execute + env.execute(); + + // verify your results + assertEquals("192.168.2.1", CollectSink.values.get(0).getInternal_ip()); + assertEquals("192.168.1.1", CollectSink.values.get(1).getInternal_ip()); + assertNull(CollectSink.values.get(0).getExternal_ip()); + assertNull(CollectSink.values.get(1).getExternal_ip()); + + + assertEquals("192.168.1.2", CollectSink.values.get(2).getInternal_ip()); + assertEquals("192.168.2.2", CollectSink.values.get(2).getExternal_ip()); + + assertEquals("192.168.2.3", CollectSink.values.get(3).getInternal_ip()); + assertEquals("192.168.1.3", CollectSink.values.get(3).getExternal_ip()); + + assertEquals("192.168.2.4", CollectSink.values.get(4).getExternal_ip()); + assertEquals("192.168.1.4", CollectSink.values.get(5).getExternal_ip()); + assertNull(CollectSink.values.get(4).getInternal_ip()); + assertNull(CollectSink.values.get(5).getInternal_ip()); + assertEquals(6, CollectSink.values.size()); + + assertEquals("bangcdn.net", CollectSink.values.get(0).getDomain()); + + + + } + + // create a testing sink + private static class CollectSink implements SinkFunction<TransformEntity> { + + // must be static + public static final List<TransformEntity> values = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void invoke(TransformEntity value, SinkFunction.Context context) throws Exception { + values.add(value); + } + } + + + +} |
