diff options
| author | qidaijie <[email protected]> | 2022-01-19 08:56:23 +0300 |
|---|---|---|
| committer | qidaijie <[email protected]> | 2022-01-19 08:56:23 +0300 |
| commit | c6f364d4515d2c3dc8b5abcea0d2db579a7a0e91 (patch) | |
| tree | 6d361a333b314ed0a512e0f45e32ec64a7d87adf | |
| parent | 4d10705cdb1e34604a91a5b670a74fb17a0fe00e (diff) | |
修复分组不均导致的缓冲区被破坏(GAL-121)
13 files changed, 82 insertions, 89 deletions
@@ -6,7 +6,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>log-olap-analysis-schema</artifactId> - <version>211120-hash</version> + <version>20220113-balance</version> <name>log-olap-analysis-schema</name> <url>http://www.example.com</url> diff --git a/properties/default_config.properties b/properties/default_config.properties index 968c7fb..e5f6b91 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -42,4 +42,10 @@ kafka.pin=galaxy2019 #====================Topology Default====================# #两个输出之间的最大时间(单位milliseconds) -buffer.timeout=100
\ No newline at end of file +buffer.timeout=100 + +#第一次随机分组random范围 +random.range.num=40 + +#app_id 更新时间,如填写0则不更新缓存 +app.tick.tuple.freq.secs=0
\ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 93384d2..9231c1b 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,27 +1,25 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -#source.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094 -source.kafka.servers=10.221.12.4:9094 +source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #管理输出kafka地址 -sink.kafka.servers=10.224.11.14:9094,10.224.11.15:9094,10.224.11.16:9094,10.224.11.17:9094,10.224.11.18:9094,10.224.11.19:9094,10.224.11.20:9094,10.224.11.21:9094,10.224.11.22:9094,10.224.11.23:9094 +sink.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #--------------------------------HTTP------------------------------# #kafka 证书地址 tools.library=D:\\workerspace\\dat\\ #网关的schema位置 -schema.http=http://10.224.11.244:9999/metadata/schema/v1/fields/liveChart_session +schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart_session #网关APP_ID 获取接口 -app.id.http=http://10.224.11.244:9999/open-api/appDicList +app.id.http=http://192.168.44.67:9999/open-api/appDicList #--------------------------------Kafka消费组信息------------------------------# #kafka 接收数据topic -source.kafka.topic=SESSION-RECORD -#source.kafka.topic=test +source.kafka.topic=test #补全数据 输出 topic sink.kafka.topic=test-result @@ -52,8 +50,8 @@ second.window.parallelism=2 #producer 并行度 sink.parallelism=1 -#app_id 更新时间,如填写0则不更新缓存 -app.tick.tuple.freq.secs=0 +#初次随机预聚合窗口时间 +first.count.window.time=5 -#聚合窗口时间 -count.window.time=15 +#二次聚合窗口时间 +second.count.window.time=15 diff --git a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java index a95a508..325e04d 100644 --- a/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java +++ b/src/main/java/com/zdjizhi/common/StreamAggregateConfig.java @@ -18,11 +18,13 @@ public class StreamAggregateConfig { public static final Integer PARSE_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "parse.parallelism"); public static final Integer FIRST_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "first.window.parallelism"); public static final Integer SECOND_WINDOW_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "second.window.parallelism"); - public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(0, "app.tick.tuple.freq.secs"); - public static final Integer COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "count.window.time"); + public static final Integer APP_TICK_TUPLE_FREQ_SECS = StreamAggregateConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs"); + public static final Integer FIRST_COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "first.count.window.time"); + public static final Integer SECOND_COUNT_WINDOW_TIME = StreamAggregateConfigurations.getIntProperty(0, "second.count.window.time"); public static final String TOOLS_LIBRARY = StreamAggregateConfigurations.getStringProperty(0, "tools.library"); public static final Integer BUFFER_TIMEOUT = StreamAggregateConfigurations.getIntProperty(1, "buffer.timeout"); public static final Integer SINK_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "sink.parallelism"); + public static final Integer RANDOM_RANGE_NUM = StreamAggregateConfigurations.getIntProperty(1, "random.range.num"); /** diff --git a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java index b6299af..98f5c96 100644 --- a/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java +++ b/src/main/java/com/zdjizhi/topology/StreamAggregateTopology.java @@ -7,7 +7,7 @@ import com.zdjizhi.utils.functions.*; import com.zdjizhi.utils.kafka.Consumer; import com.zdjizhi.utils.kafka.Producer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; @@ -38,19 +38,19 @@ public class StreamAggregateTopology { DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer()) .setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM); - SingleOutputStreamOperator<Tuple4<String, String, String, String>> parseDataMap = streamSource.map(new ParseMapFunction()) + SingleOutputStreamOperator<Tuple3<String, String, String>> parseDataMap = streamSource.map(new ParseMapFunction()) .name("ParseDataMap") .setParallelism(StreamAggregateConfig.PARSE_PARALLELISM); - WindowedStream<Tuple4<String, String, String, String>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(5))); + WindowedStream<Tuple3<String, String, String>, String, TimeWindow> firstWindow = parseDataMap.keyBy(new FirstKeyByFunction()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.FIRST_COUNT_WINDOW_TIME))); SingleOutputStreamOperator<Tuple2<String, String>> metricCountWindow = firstWindow.process(new FirstCountWindowFunction()) .name("FirstCountWindow") .setParallelism(StreamAggregateConfig.FIRST_WINDOW_PARALLELISM); WindowedStream<Tuple2<String, String>, String, TimeWindow> secondWindow = metricCountWindow.keyBy(new SecondKeyByFunction()) - .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.COUNT_WINDOW_TIME))); + .window(TumblingProcessingTimeWindows.of(Time.seconds(StreamAggregateConfig.SECOND_COUNT_WINDOW_TIME))); SingleOutputStreamOperator<String> secondCountWindow = secondWindow.process(new SecondCountWindowFunction()) .name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM); diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java index be827c1..1aa32c7 100644 --- a/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/FirstCountWindowFunction.java @@ -6,7 +6,6 @@ import com.zdjizhi.utils.general.MetricFunctions; import com.zdjizhi.utils.json.JsonParseUtil; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; @@ -15,7 +14,6 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * @author qidaijie @@ -23,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap; * @Description: * @date 2021/7/2113:55 */ -public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<String, String, String, String>, Tuple2<String, String>, String, TimeWindow> { +public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple3<String, String, String>, Tuple2<String, String>, String, TimeWindow> { private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap(); @@ -32,14 +30,15 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<Strin @Override @SuppressWarnings("unchecked") - public void process(String key, Context context, Iterable<Tuple4<String, String, String, String>> input, Collector<Tuple2<String, String>> output) { + public void process(String key, Context context, Iterable<Tuple3<String, String, String>> input, Collector<Tuple2<String, String>> output) { try { - for (Tuple4<String, String, String, String> tuple : input) { - String label = tuple.f1; + for (Tuple3<String, String, String> tuple : input) { + String label = tuple.f0; + String dimensions = tuple.f1; + String message = tuple.f2; + String l7_Protocol = label.substring(0, label.indexOf("@")); //action中某个协议的所有function,如果没有就默认 - String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default")); - String dimensions = tuple.f2; - String message = tuple.f3; + String[] metricNames = actionMap.getOrDefault(l7_Protocol, actionMap.get("Default")); if (StringUtil.isNotBlank(message)) { Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class); Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); @@ -57,11 +56,9 @@ public class FirstCountWindowFunction extends ProcessWindowFunction<Tuple4<Strin } if (!cacheMap.isEmpty()) { - Long endTime = context.window().getEnd() / 1000; - - for (String countKey : cacheMap.keySet()) { - Map<String, Object> resultMap = cacheMap.get(countKey); - output.collect(new Tuple2<>(countKey, JsonMapper.toJsonString(resultMap))); + for (String dimensions : cacheMap.keySet()) { + Map<String, Object> resultMap = cacheMap.get(dimensions); + output.collect(new Tuple2<>(dimensions, JsonMapper.toJsonString(resultMap))); } } diff --git a/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java b/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java index e02893d..831c90a 100644 --- a/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/FirstKeyByFunction.java @@ -12,10 +12,10 @@ import org.apache.flink.api.java.tuple.Tuple4; * @Description: * @date 2021/7/2112:13 */ -public class FirstKeyByFunction implements KeySelector<Tuple4<String, String, String, String>, String> { +public class FirstKeyByFunction implements KeySelector<Tuple3< String, String, String>, String> { @Override - public String getKey(Tuple4<String, String, String, String> value) throws Exception { + public String getKey(Tuple3<String, String, String> value) throws Exception { // //以map拼接的key分组 return value.f0; } diff --git a/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java index aa59e6b..bf67eb6 100644 --- a/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ParseMapFunction.java @@ -10,7 +10,7 @@ import com.zdjizhi.utils.general.ParseFunctions; import com.zdjizhi.utils.json.JsonParseUtil; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; /** * @author qidaijie @@ -25,7 +26,7 @@ import java.util.Map; * @Description: * @date 2021/5/2715:01 */ -public class ParseMapFunction implements MapFunction<String,Tuple4<String,String, String, String>> { +public class ParseMapFunction implements MapFunction<String, Tuple3<String, String, String>> { private static final Logger logger = LoggerFactory.getLogger(FirstCountWindowFunction.class); private static ArrayList<String[]> jobList = JsonParseUtil.getTransformsList(); @@ -34,11 +35,11 @@ public class ParseMapFunction implements MapFunction<String,Tuple4<String,String @Override @SuppressWarnings("unchecked") - public Tuple4<String,String, String, String> map(String message) { + public Tuple3<String, String, String> map(String message) { try { if (StringUtil.isNotBlank(message)) { Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); - String streamTraceId = JsonParseUtil.getString(object, "common_stream_trace_id"); +// String streamTraceId = JsonMapperParseUtil.getString(object, "common_stream_trace_id"); Map<String, Object> dimensionsObj = ParseFunctions.transDimensions(dimensionsMap, object); if (ParseFunctions.filterLogs(object)) { for (String[] strings : jobList) { @@ -76,7 +77,8 @@ public class ParseMapFunction implements MapFunction<String,Tuple4<String,String } break; case "hierarchy": - return new Tuple4<>(streamTraceId,JsonParseUtil.getString(object, logsKeyName), JsonMapper.toJsonString(dimensionsObj), JsonMapper.toJsonString(object)); + String key = JsonParseUtil.getString(object, logsKeyName) + "@" + ThreadLocalRandom.current().nextInt(StreamAggregateConfig.RANDOM_RANGE_NUM); + return new Tuple3<>(key, JsonMapper.toJsonString(dimensionsObj), ParseFunctions.getMetricsLog(object)); default: break; } @@ -84,10 +86,10 @@ public class ParseMapFunction implements MapFunction<String,Tuple4<String,String } } } catch (RuntimeException e) { - logger.error("Map Parse error,message:" + e); - return new Tuple4<>("","", "", ""); + logger.error("An error occurred in the original log parsing reorganization,error message is:" + e); + return new Tuple3<>("", "", ""); } - return new Tuple4<>("","", "", ""); + return new Tuple3<>("", "", ""); } /** diff --git a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java index be78433..d458984 100644 --- a/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/ResultFlatMapFunction.java @@ -38,7 +38,6 @@ public class ResultFlatMapFunction implements FlatMapFunction<String, String> { out.collect(JsonMapper.toJsonString(jsonObject)); } } -// out.collect(value); } } } diff --git a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java index 794b4ea..2d6b546 100644 --- a/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java +++ b/src/main/java/com/zdjizhi/utils/functions/SecondCountWindowFunction.java @@ -26,7 +26,6 @@ public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<Stri private static final Logger logger = LoggerFactory.getLogger(SecondCountWindowFunction.class); private static HashMap<String, String[]> metricsMap = JsonParseUtil.getMetricsMap(); - private static HashMap<String, String[]> actionMap = JsonParseUtil.getActionMap(); private HashMap<String, Map<String, Object>> cacheMap = new HashMap<>(320); private static String resultTimeKey = JsonParseUtil.getTimeKey(); @@ -40,17 +39,15 @@ public class SecondCountWindowFunction extends ProcessWindowFunction<Tuple2<Stri if (StringUtil.isNotBlank(message)) { Map<String, Object> dimensionsObj = (Map<String, Object>) JsonMapper.fromJsonString(dimensions, Map.class); Map<String, Object> object = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class); - String label = JsonParseUtil.getString(object, "protocol_id"); - //action中某个协议的所有function,如果没有就默认 - String[] metricNames = actionMap.getOrDefault(label, actionMap.get("Default")); Map<String, Object> cacheMessage = cacheMap.getOrDefault(dimensions, dimensionsObj); - for (String name : metricNames) { + for (String name : metricsMap.keySet()) { String[] metrics = metricsMap.get(name); String function = metrics[0]; functionSet(function, cacheMessage, name, cacheMessage.get(name), JsonParseUtil.getValue(object, name)); } + cacheMap.put(dimensions, cacheMessage); } } diff --git a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java index 6bb3178..6933e7c 100644 --- a/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java +++ b/src/main/java/com/zdjizhi/utils/general/ParseFunctions.java @@ -3,9 +3,11 @@ package com.zdjizhi.utils.general; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.json.JsonParseUtil; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -21,6 +23,7 @@ public class ParseFunctions { */ private static HashMap<String, String> filtersMap = JsonParseUtil.getFiltersMap(); + private static ArrayList<String> metricsList = JsonParseUtil.getLogMetrics(); /** * 解析 dimensions 字段集 @@ -62,36 +65,15 @@ public class ParseFunctions { return available; } -// /** -// * 更新缓存中的对应关系map -// * -// * @param hashMap 当前缓存对应关系map -// */ -// public static void updateAppRelation(HashMap<Integer, String> hashMap) { -// try { -// Long begin = System.currentTimeMillis(); -// String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP); -// if (StringUtil.isNotBlank(schema)) { -// String data = JSONObject.parseObject(schema).getString("data"); -// JSONArray objects = JSONArray.parseArray(data); -// for (Object object : objects) { -// JSONArray jsonArray = JSONArray.parseArray(object.toString()); -// int key = jsonArray.getInteger(0); -// String value = jsonArray.getString(1); -// if (hashMap.containsKey(key)) { -// if (!value.equals(hashMap.get(key))) { -// hashMap.put(key, value); -// } -// } else { -// hashMap.put(key, value); -// } -// } -// logger.warn("更新缓存对应关系用时:" + (begin - System.currentTimeMillis())); -// logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size()); -// } -// } catch (RuntimeException e) { -// logger.error("更新缓存APP-ID失败,异常:" + e); -// } -// } + public static String getMetricsLog(Map<String, Object> object) { + + Map<String, Object> json = new HashMap<>(16); + + for (String fileName : metricsList) { + json.put(fileName, object.get(fileName)); + } + + return JsonMapper.toJsonString(json); + } } diff --git a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java index 4a6a01c..0ebe8e1 100644 --- a/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java +++ b/src/main/java/com/zdjizhi/utils/json/JsonParseUtil.java @@ -232,6 +232,24 @@ public class JsonParseUtil { } /** + * 获取Metrics内指标,用于过滤原始日志 + * + * @return 指标列原始名称 + */ + public static ArrayList<String> getLogMetrics() { + ArrayList<String> list = new ArrayList<>(); + String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP); + DocumentContext parse = JsonPath.parse(schema); + + List<Object> metrics = parse.read("$.data.doc.metrics[*]"); + + for (Object metric : metrics) { + list.add(JsonPath.read(metric, "$.fieldName")); + } + return list; + } + + /** * 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象 * * @return 用于反射生成schema类型的对象的一个map集合 diff --git a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java index 01e8540..9cb0631 100644 --- a/src/main/java/com/zdjizhi/utils/json/TypeUtils.java +++ b/src/main/java/com/zdjizhi/utils/json/TypeUtils.java @@ -39,14 +39,6 @@ public class TypeUtils { return ((Number) value).longValue(); } -// if (value instanceof Map) { -// return (Map) value; -// } -// -// if (value instanceof List) { -// return Collections.singletonList(value.toString()); -// } - if (value instanceof Boolean) { return (Boolean) value ? 1 : 0; } |
