diff options
| author | qidaijie <[email protected]> | 2021-11-20 11:30:08 +0300 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2021-11-20 11:30:08 +0300 |
| commit | 2a32156c9eb8205e3c7bad02c3bdacdeabdd9000 (patch) | |
| tree | 8fa68603c24582cb0057dfef2dcb30cdc2bbbf41 | |
| parent | 49f78a2f4941b5d338da6a1001f9d8940f71aef3 (diff) | |
修改配置文件名称
修改统计逻辑两层窗口计算
15 files changed, 273 insertions, 124 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>log-olap-analysis-schema</artifactId> - <version>211105-flattenSpec</version> + <version>211120-hash</version> <name>log-olap-analysis-schema</name> <url>http://www.example.com</url> @@ -116,7 +116,7 @@ <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> - <version>1.0.6</version> + <version>1.0.7</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> @@ -129,15 +129,6 @@ </exclusions> </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table --> - <!--<dependency>--> - <!--<groupId>org.apache.flink</groupId>--> - <!--<artifactId>flink-table</artifactId>--> - <!--<version>${flink.version}</version>--> - <!--<type>pom</type>--> - <!--<scope>${scope.type}</scope>--> - <!--</dependency>--> - <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> <dependency> <groupId>org.apache.flink</groupId> diff --git a/properties/default_config.properties b/properties/default_config.properties index 2b9bfb1..968c7fb 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -1,29 +1,45 @@ -#producer���ԵĴ������� +#====================Kafka Consumer====================# +#kafka source connection timeout +session.timeout.ms=60000 + +#kafka source poll +max.poll.records=3000 + +#kafka source poll bytes +max.partition.fetch.bytes=31457280 +#====================Kafka Producer====================# +#producer重试的次数设置 retries=0 -#���ĺ������˵һ��Batch������֮��������ã��������Batch��û��д���������뷢�ͳ�ȥ�� -linger.ms=5 +#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了 +linger.ms=10 -#����ڳ�ʱ֮ǰδ�յ���Ӧ���ͻ��˽��ڱ�Ҫʱ���·������� +#如果在超时之前未收到响应,客户端将在必要时重新发送请求 request.timeout.ms=30000 -#producer���ǰ���batch���з��͵�,���δ�С��Ĭ��:16384 +#producer都是按照batch进行发送的,批次大小,默认:16384 batch.size=262144 -#Producer�����ڻ�����Ϣ�Ļ�������С -buffer.memory=67108864 +#Producer端用于缓存消息的缓冲区大小 +#128M +buffer.memory=134217728 + +#这个参数决定了每次发送给Kafka服务器请求的最大大小,默认1048576 +#10M +max.request.size=10485760 +#====================kafka default====================# +#kafka source protocol; SSL or SASL +kafka.source.protocol=SASL -#�������������ÿ�η���Kafka���������������С,Ĭ��1048576 -max.request.size=5242880 +#kafka sink protocol; SSL or SASL +kafka.sink.protocol=SASL -#kafka SASL��֤�û��� +#kafka SASL验证用户名 kafka.user=admin -#kafka SASL��SSL��֤���� +#kafka SASL及SSL验证密码 kafka.pin=galaxy2019 +#====================Topology Default====================# -#kafka source protocol; SSL or SASL -kafka.source.protocol=SASL - -#kafka sink protocol; SSL or SASL -kafka.sink.protocol=SASL
\ No newline at end of file +#两个输出之间的最大时间(单位milliseconds) +buffer.timeout=100
\ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 91f018d..2f62434 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,34 +1,33 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -input.kafka.servers=192.168.44.12:9094 +#source.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094 +source.kafka.servers=10.221.12.4:9094 #管理输出kafka地址 -output.kafka.servers=192.168.44.12:9094 +sink.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094 #--------------------------------HTTP------------------------------# #kafka 证书地址 -tools.library=D:\\K18-Phase2\\tsgSpace\\dat\\tsg\\ +tools.library=D:\\workerspace\\dat\\ #网关的schema位置 -#schema.http=http://192.168.44.12:9999/metadata/schema/v1/fields/liveChart_interim -schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart_session +schema.http=http://10.224.11.244:9999/metadata/schema/v1/fields/liveChart_session #网关APP_ID 获取接口 -app.id.http=http://192.168.44.67:9999/open-api/appDicList +app.id.http=http://10.224.11.244:9999/open-api/appDicList #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -input.kafka.topic=test -#input.kafka.topic=SESSION-RECORD -#input.kafka.topic=INTERIM-SESSION-RECORD +source.kafka.topic=SESSION-RECORD +#source.kafka.topic=test #补全数据 输出 topic -output.kafka.topic=test-result +sink.kafka.topic=test-result #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; -group.id=liveCharts-session-test-20210811-1 +group.id=mytest-211119-1 #生产者压缩模式 none or snappy producer.kafka.compression.type=none @@ -39,13 +38,21 @@ producer.ack=1 #--------------------------------topology配置------------------------------# #consumer 并行度 -consumer.parallelism=1 +source.parallelism=1 #map函数并行度 -parse.parallelism=1 +parse.parallelism=2 + +#count 函数并行度 +first.window.parallelism=2 + +second.window.parallelism=2 + +#producer 并行度 +sink.parallelism=1 #app_id 更新时间,如填写0则不更新缓存 app.tick.tuple.freq.secs=0 #聚合窗口时间 -count.window.time=15
\ No newline at end of file +count.window.time=15 diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java index 3647c5a..a95a508 100644 --- a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java +++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java @@ -14,22 +14,23 @@ public class StreamAggregateConfig { /** * System */ - public static final Integer CONSUMER_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "consumer.parallelism"); + public static final Integer SOURCE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "source.parallelism"); public static final Integer PARSE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.parallelism"); + public static final Integer FIRST_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "first.window.parallelism"); + public static final Integer SECOND_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "second.window.parallelism"); public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs"); public static final Integer COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "count.window.time"); public static final String TOOLS_LIBRARY = StreamAggregateConfigurations.getStringProperty(0, "tools.library"); + public static final Integer BUFFER_TIMEOUT = StreamAggregateConfigurations.getIntProperty(1, "buffer.timeout"); + public static final Integer SINK_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "sink.parallelism"); + /** * kafka source */ - public static final String INPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.servers"); - public static final String OUTPUT_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.servers"); - public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id"); - public static final String OUTPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "output.kafka.topic"); - public static final String INPUT_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "input.kafka.topic"); + public static final String SINK_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.servers"); + public static final String SINK_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "sink.kafka.topic"); public static final String PRODUCER_ACK = StreamAggregateConfigurations.getStringProperty(0, "producer.ack"); - public static final String KAFKA_SOURCE_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.source.protocol"); public static final String KAFKA_SINK_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.sink.protocol"); public static final String KAFKA_USER = StreamAggregateConfigurations.getStringProperty(1, "kafka.user"); public static final String KAFKA_PIN = StreamAggregateConfigurations.getStringProperty(1, "kafka.pin"); @@ -42,6 +43,18 @@ public class StreamAggregateConfig { /** + * kafka source config + */ + public static final String SOURCE_KAFKA_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "source.kafka.servers"); + public static final String SOURCE_KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "source.kafka.topic"); + public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id"); + public static final String KAFKA_SOURCE_PROTOCOL = StreamAggregateConfigurations.getStringProperty(1, "kafka.source.protocol"); + public static final String SESSION_TIMEOUT_MS = StreamAggregateConfigurations.getStringProperty(1, "session.timeout.ms"); + public static final String MAX_POLL_RECORDS = StreamAggregateConfigurations.getStringProperty(1, "max.poll.records"); + public static final String MAX_PARTITION_FETCH_BYTES = StreamAggregateConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); + + + /** * kafka限流配置-20201117 */ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "producer.kafka.compression.type"); diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java index 3d9eb3e..b726ca8 100644 --- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -6,7 +6,9 @@ import com.zdjizhi.common.StreamAggregateConfig; import com.zdjizhi.utils.functions.*; import com.zdjizhi.utils.kafka.Consumer; import com.zdjizhi.utils.kafka.Producer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; @@ -31,20 +33,31 @@ public class StreamAggregateTopology { // environment.enableCheckpointing(5000); + //两个输出之间的最大时间 (单位milliseconds) + environment.setBufferTimeout(StreamAggregateConfig.BUFFER_TIMEOUT); + DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()) - .setParallelism(StreamAggregateConfig.CONSUMER_PARALLELISM); + .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM); - SingleOutputStreamOperator<Tuple3<String, String, String>> parseDataMap = streamSource.map(new MapParseFunction()).name("ParseDataMap") + SingleOutputStreamOperator<Tuple4<String, String, String, String>> parseDataMap = streamSource.map(new MapParseFunction()) + .name("ParseDataMap") .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); - WindowedStream<Tuple3<String, String, String>, String, TimeWindow> window = parseDataMap.keyBy(new KeyByFunction()) + WindowedStream<Tuple4<String, String, String, String>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(5))); + + SingleOutputStreamOperator<Tuple2<String, String>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction()) + .name("FirstCountWindow") + .setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM); + + WindowedStream<Tuple2<String, String>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction()) .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.COUNT_WINDOW_TIME))); - SingleOutputStreamOperator<String> metricCountWindow = window.process(new CountWindowFunction()).name("MetricCountWindow") - .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); + SingleOutputStreamOperator<String> secondCountWindow = secondWindow.process(new SecondCountWindowFunction()) + .name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM); - metricCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM) - .addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); + secondCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM) + .addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.SINK_PARALLELISM); environment.execute(args[0]); } catch (Exception e) { diff --git a/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java index 5f22a6b..1f821c1 100644 --- a/src/main/java/com/zdjizhi/utils/functions/CountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java @@ -1,12 +1,12 @@ package com.zdjizhi.utils.functions; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.general.MetricFunctions; import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; @@ -15,7 +15,6 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * @author qidaijie @@ -23,25 +22,25 @@ import java.util.concurrent.ConcurrentHashMap; * @Description: * @date 2021/7/2113:55 */ -public class CountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, String, String, TimeWindow> { - private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class); +public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<String, String, String, String>, Tuple2<String, String>, String, TimeWindow> { + private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap(); private static HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap(); - private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(32); + private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(320); private static String resultTimeKey = JsonParseUtil.getTimeKey(); @Override @SuppressWarnings("unchecked") - public void process(String key, Context context, Iterable<Tuple3<String, String, String>> input, Collector<String> output) { + public void process(String key, Context context, Iterable<Tuple4<String, String, String, String>> input, Collector<Tuple2<String, String>> output) { try { - for (Tuple3<String, String, String> tuple : input) { - String label = tuple.f0; + for (Tuple4<String, String, String, String> tuple : input) { + String label = tuple.f1; //action中某个协议的所有function,如果没有就默认 String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default")); - String dimensions = tuple.f1; - String message = tuple.f2; - if (StringUtil.isNotBlank(message)){ + String dimensions = tuple.f2; + String message = tuple.f3; + if (StringUtil.isNotBlank(message)) { Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class); Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); @@ -63,9 +62,8 @@ public class CountWindowFunction extends ProcessWindowFunction<Tuple3<String, St for (String countKey : cacheMap.keySet()) { Map<String, Object> resultMap = cacheMap.get(countKey); JsonParseUtil.setValue(resultMap, resultTimeKey, endTime); - output.collect(JsonMapper.toJsonString(resultMap)); + output.collect(new Tuple2<>(countKey, JsonMapper.toJsonString(resultMap))); } -// cacheMap.clear(); } } catch (RuntimeException e) { diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java new file mode 100644 index 0000000..e02893d --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java @@ -0,0 +1,22 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.core.util.RandomUtil; +import com.zdjizhi.common.StreamAggregateConfig; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2112:13 + */ +public class FirstKeyByFunction implements KeySelector<Tuple4<String, String, String, String>, String> { + + @Override + public String getKey(Tuple4<String, String, String, String> value) throws Exception { +// //以map拼接的key分组 + return value.f0; + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java deleted file mode 100644 index 0b00b3c..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/KeyByFunction.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.zdjizhi.utils.functions; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple3; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/7/2112:13 - */ -public class KeyByFunction implements KeySelector<Tuple3<String, String, String>, String> { - - @Override - public String getKey(Tuple3<String, String, String> value) throws Exception { - //以map拼接的key分组 - return value.f1; - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java index 7244b1d..90a21a1 100644 --- a/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/MapParseFunction.java @@ -1,5 +1,6 @@ package com.zdjizhi.utils.functions; +import cn.hutool.core.util.RandomUtil; import com.jayway.jsonpath.InvalidPathException; import com.jayway.jsonpath.JsonPath; import com.zdjizhi.common.StreamAggregateConfig; @@ -8,8 +9,8 @@ import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.general.ParseFunctions; import com.zdjizhi.utils.json.JsonParseUtil; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,8 +25,8 @@ import java.util.Map; * @Description: * @date 2021/5/2715:01 */ -public class MapParseFunction implements MapFunction<String, Tuple3<String, String, String>> { - private static final Logger logger = LoggerFactory.getLogger(CountWindowFunction.class); +public class MapParseFunction implements MapFunction<String,Tuple4<String,String, String, String>> { + private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); private static ArrayList<String[]> jobList = JsonParseUtil.getTransformsList(); @@ -33,10 +34,11 @@ public class MapParseFunction implements MapFunction<String, Tuple3<String, Stri @Override @SuppressWarnings("unchecked") - public Tuple3<String, String, String> map(String message) { + public Tuple4<String,String, String, String> map(String message) { try { if (StringUtil.isNotBlank(message)) { Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); +// String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id"); Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object); if (ParseFunctions.filterLogs(object)) { for (String[] strings : jobList) { @@ -74,7 +76,9 @@ public class MapParseFunction implements MapFunction<String, Tuple3<String, Stri } break; case "hierarchy": - return new Tuple3<>(JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)); + String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id"); +// RandomUtil.randomInt(0, StreamAggregateConfig.COUNT_PARALLELISM) + return new Tuple4<>(streamTraceId,JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)); default: break; } @@ -83,9 +87,9 @@ public class MapParseFunction implements MapFunction<String, Tuple3<String, Stri } } catch (RuntimeException e) { logger.error("Map Parse error,message:" + e); - return new Tuple3<>("", "", ""); + return new Tuple4<>("","", "", ""); } - return new Tuple3<>("", "", ""); + return new Tuple4<>("","", "", ""); } /** diff --git a/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java b/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java deleted file mode 100644 index 4ba139f..0000000 --- a/src/main/java/com/zdjizhi/utils/functions/MyTimeAssigner.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.zdjizhi.utils.functions; - -import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.json.JsonParseUtil; -import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; - -import java.util.Map; - -/** - * @author qidaijie - * @Package com.zdjizhi.utils.functions - * @Description: - * @date 2021/7/2117:32 - */ -public class MyTimeAssigner implements SerializableTimestampAssigner<String> { - @Override - public long extractTimestamp(String element, long recordTimestamp) { - Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(element, Map.class); - - return JsonParseUtil.getLong(object,"common_end_time"); - } -} diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java index d458984..be78433 100644 --- a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java @@ -38,6 +38,7 @@ public class ResultFlatMapFunction implements FlatMapFunction<String, String> { out.collect(JsonMapper.toJsonString(jsonObject)); } } +// out.collect(value); } } } diff --git a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java new file mode 100644 index 0000000..455414a --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java @@ -0,0 +1,101 @@ +package com.zdjizhi.utils.functions; + +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.MetricFunctions; +import com.zdjizhi.utils.json.JsonParseUtil; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2113:55 + */ +public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<String, String>, String, String, TimeWindow> { + private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class); + + private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap(); + private static HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap(); + private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(320); + private static String resultTimeKey = JsonParseUtil.getTimeKey(); + + @Override + @SuppressWarnings("unchecked") + public void process(String key, Context context, Iterable<Tuple2<String, String>> input, Collector<String> output) { + try { + for (Tuple2<String, String> tuple : input) { + String dimensions = tuple.f0; + String message = tuple.f1; + if (StringUtil.isNotBlank(message)) { + Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class); + Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); + String label = JsonParseUtil.getString(object, "protocol_id"); + //action中某个协议的所有function,如果没有就默认 + String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default")); + + Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj); + for (String name : metricNames) { + String[] metrics = metricsMap.get(name); + String function = metrics[0]; + functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, name)); + + } + cacheMap.put(dimensions, cacheMessage); + } + } + + if (!cacheMap.isEmpty()) { + Long endTime = context.window().getEnd() / 1000; + + for (String countKey : cacheMap.keySet()) { + Map<String, Object> resultMap = cacheMap.get(countKey); + JsonParseUtil.setValue(resultMap, resultTimeKey, endTime); + output.collect(JsonMapper.toJsonString(resultMap)); + } + } + + } catch (RuntimeException e) { + logger.error("windows count error,message:" + e); + e.printStackTrace(); + } finally { + cacheMap.clear(); + } + } + + /** + * 根据schema描述对应字段进行操作的 函数集合 + * + * @param function 函数名称 + * @param cacheMessage 结果集 + * @param nameValue 当前值 + * @param fieldNameValue 新加值 + */ + private static void functionSet(String function, Map<String, Object> cacheMessage, String resultName, Object nameValue, Object fieldNameValue) { + switch (function) { + case "sum": + cacheMessage.put(resultName, MetricFunctions.longSum(nameValue, fieldNameValue)); + break; + case "count": + cacheMessage.put(resultName, MetricFunctions.count(nameValue)); + break; + case "unique_sip_num": + //TODO + break; + case "unique_cip_num": + //TODO + break; + default: + break; + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/SecondKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/SecondKeyByFunction.java new file mode 100644 index 0000000..c27bd04 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/SecondKeyByFunction.java @@ -0,0 +1,24 @@ +package com.zdjizhi.utils.functions; + +import cn.hutool.core.util.RandomUtil; +import com.zdjizhi.common.StreamAggregateConfig; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/7/2112:13 + */ +public class SecondKeyByFunction implements KeySelector<Tuple2<String,String>, String> { + + @Override + public String getKey(Tuple2<String, String> value) throws Exception { + + //以map拼接的key分组 + return value.f0; + } + +} diff --git a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java index a24ab4e..9379a1e 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Consumer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/Consumer.java @@ -17,11 +17,11 @@ import java.util.Properties; public class Consumer { private static Properties createConsumerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", StreamAggregateConfig.INPUT_KAFKA_SERVERS); + properties.put("bootstrap.servers", StreamAggregateConfig.SOURCE_KAFKA_SERVERS); properties.put("group.id", StreamAggregateConfig.GROUP_ID); - properties.put("session.timeout.ms", "60000"); - properties.put("max.poll.records", 3000); - properties.put("max.partition.fetch.bytes", 31457280); + properties.put("session.timeout.ms", StreamAggregateConfig.SESSION_TIMEOUT_MS); + properties.put("max.poll.records", StreamAggregateConfig.MAX_POLL_RECORDS); + properties.put("max.partition.fetch.bytes", StreamAggregateConfig.MAX_PARTITION_FETCH_BYTES); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); @@ -31,7 +31,7 @@ public class Consumer { } public static FlinkKafkaConsumer<String> getKafkaConsumer() { - FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(StreamAggregateConfig.INPUT_KAFKA_TOPIC, + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(StreamAggregateConfig.SOURCE_KAFKA_TOPIC, new SimpleStringSchema(), createConsumerConfig()); kafkaConsumer.setCommitOffsetsOnCheckpoints(true); diff --git a/src/main/java/com/zdjizhi/utils/kafka/Producer.java b/src/main/java/com/zdjizhi/utils/kafka/Producer.java index 65330b5..dc407e7 100644 --- a/src/main/java/com/zdjizhi/utils/kafka/Producer.java +++ b/src/main/java/com/zdjizhi/utils/kafka/Producer.java @@ -19,7 +19,7 @@ public class Producer { private static Properties createProducerConfig() { Properties properties = new Properties(); - properties.put("bootstrap.servers", StreamAggregateConfig.OUTPUT_KAFKA_SERVERS); + properties.put("bootstrap.servers", StreamAggregateConfig.SINK_KAFKA_SERVERS); properties.put("acks", StreamAggregateConfig.PRODUCER_ACK); properties.put("retries", StreamAggregateConfig.RETRIES); properties.put("linger.ms", StreamAggregateConfig.LINGER_MS); @@ -37,7 +37,7 @@ public class Producer { public static FlinkKafkaProducer<String> getKafkaProducer() { FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( - StreamAggregateConfig.OUTPUT_KAFKA_TOPIC, + StreamAggregateConfig.SINK_KAFKA_TOPIC, new SimpleStringSchema(), createProducerConfig(), Optional.empty()); |
